feat(serverless-spark)!: add Cloud Console and Logging URLs to get_batch

These are useful links for humans to follow for more information
(output, metrics, logs) that's not readily availble via MCP.
This commit is contained in:
Dave Borowitz
2025-11-21 09:52:10 -08:00
parent 285aa46b88
commit e29c0616d6
5 changed files with 291 additions and 37 deletions

View File

@@ -34,43 +34,50 @@ tools:
## Response Format
The response is a full Batch JSON object as defined in the [API
spec](https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#Batch).
Example with a reduced set of fields:
The response contains the full Batch object as defined in the [API
spec](https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#Batch),
plus additional fields `consoleUrl` and `logsUrl` where a human can go for more
detailed information.
```json
{
"createTime": "2025-10-10T15:15:21.303146Z",
"creator": "alice@example.com",
"labels": {
"goog-dataproc-batch-uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
"goog-dataproc-location": "us-central1"
},
"name": "projects/google.com:hadoop-cloud-dev/locations/us-central1/batches/alice-20251010-abcd",
"operation": "projects/google.com:hadoop-cloud-dev/regions/us-central1/operations/11111111-2222-3333-4444-555555555555",
"runtimeConfig": {
"properties": {
"spark:spark.driver.cores": "4",
"spark:spark.driver.memory": "12200m"
}
},
"sparkBatch": {
"jarFileUris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"mainClass": "org.apache.spark.examples.SparkPi"
},
"state": "SUCCEEDED",
"stateHistory": [
{
"state": "PENDING",
"stateStartTime": "2025-10-10T15:15:21.303146Z"
"batch": {
"createTime": "2025-10-10T15:15:21.303146Z",
"creator": "alice@example.com",
"labels": {
"goog-dataproc-batch-uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
"goog-dataproc-location": "us-central1"
},
{
"state": "RUNNING",
"stateStartTime": "2025-10-10T15:16:41.291747Z"
}
],
"stateTime": "2025-10-10T15:17:21.265493Z",
"uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
"name": "projects/google.com:hadoop-cloud-dev/locations/us-central1/batches/alice-20251010-abcd",
"operation": "projects/google.com:hadoop-cloud-dev/regions/us-central1/operations/11111111-2222-3333-4444-555555555555",
"runtimeConfig": {
"properties": {
"spark:spark.driver.cores": "4",
"spark:spark.driver.memory": "12200m"
}
},
"sparkBatch": {
"jarFileUris": [
"file:///usr/lib/spark/examples/jars/spark-examples.jar"
],
"mainClass": "org.apache.spark.examples.SparkPi"
},
"state": "SUCCEEDED",
"stateHistory": [
{
"state": "PENDING",
"stateStartTime": "2025-10-10T15:15:21.303146Z"
},
{
"state": "RUNNING",
"stateStartTime": "2025-10-10T15:16:41.291747Z"
}
],
"stateTime": "2025-10-10T15:17:21.265493Z",
"uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
},
"consoleUrl": "https://console.cloud.google.com/dataproc/batches/...",
"logsUrl": "https://console.cloud.google.com/logs/viewer?..."
}
```

View File

@@ -0,0 +1,91 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import (
"fmt"
"net/url"
"regexp"
"time"
"cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
)
const (
logTimeBufferBefore = 1 * time.Minute
logTimeBufferAfter = 10 * time.Minute
)
var batchFullNameRegex = regexp.MustCompile(`projects/(?P<project>[^/]+)/locations/(?P<location>[^/]+)/batches/(?P<batch_id>[^/]+)`)
// Extract BatchDetails extracts the project ID, location, and batch ID from a fully qualified batch name.
func ExtractBatchDetails(batchName string) (projectID, location, batchID string, err error) {
matches := batchFullNameRegex.FindStringSubmatch(batchName)
if len(matches) < 4 {
return "", "", "", fmt.Errorf("failed to parse batch name: %s", batchName)
}
return matches[1], matches[2], matches[3], nil
}
// BatchConsoleURLFromProto builds a URL to the Google Cloud Console linking to the batch summary page.
func BatchConsoleURLFromProto(batchPb *dataprocpb.Batch) (string, error) {
projectID, location, batchID, err := ExtractBatchDetails(batchPb.GetName())
if err != nil {
return "", err
}
return BatchConsoleURL(projectID, location, batchID), nil
}
// BatchLogsURLFromProto builds a URL to the Google Cloud Console showing Cloud Logging for the given batch and time range.
func BatchLogsURLFromProto(batchPb *dataprocpb.Batch) (string, error) {
projectID, location, batchID, err := ExtractBatchDetails(batchPb.GetName())
if err != nil {
return "", err
}
createTime := batchPb.GetCreateTime().AsTime()
stateTime := batchPb.GetStateTime().AsTime()
return BatchLogsURL(projectID, location, batchID, createTime, stateTime), nil
}
// BatchConsoleURL builds a URL to the Google Cloud Console linking to the batch summary page.
func BatchConsoleURL(projectID, location, batchID string) string {
return fmt.Sprintf("https://console.cloud.google.com/dataproc/batches/%s/%s/summary?project=%s", location, batchID, projectID)
}
// BatchLogsURL builds a URL to the Google Cloud Console showing Cloud Logging for the given batch and time range.
//
// The implementation adds some buffer before and after the provided times.
func BatchLogsURL(projectID, location, batchID string, startTime, endTime time.Time) string {
advancedFilterTemplate := `resource.type="cloud_dataproc_batch"
resource.labels.project_id="%s"
resource.labels.location="%s"
resource.labels.batch_id="%s"`
advancedFilter := fmt.Sprintf(advancedFilterTemplate, projectID, location, batchID)
if !startTime.IsZero() {
actualStart := startTime.Add(-1 * logTimeBufferBefore)
advancedFilter += fmt.Sprintf("\ntimestamp>=\"%s\"", actualStart.Format(time.RFC3339Nano))
}
if !endTime.IsZero() {
actualEnd := endTime.Add(logTimeBufferAfter)
advancedFilter += fmt.Sprintf("\ntimestamp<=\"%s\"", actualEnd.Format(time.RFC3339Nano))
}
v := url.Values{}
v.Add("resource", "cloud_dataproc_batch/batch_id/"+batchID)
v.Add("advancedFilter", advancedFilter)
v.Add("project", projectID)
return "https://console.cloud.google.com/logs/viewer?" + v.Encode()
}

View File

@@ -0,0 +1,120 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package common
import (
"testing"
"time"
"cloud.google.com/go/dataproc/v2/apiv1/dataprocpb"
"google.golang.org/protobuf/types/known/timestamppb"
)
func TestExtractBatchDetails_Success(t *testing.T) {
t.Parallel()
batchName := "projects/my-project/locations/us-central1/batches/my-batch"
projectID, location, batchID, err := ExtractBatchDetails(batchName)
if err != nil {
t.Errorf("ExtractBatchDetails() error = %v, wantErr %v", err, false)
return
}
if projectID != "my-project" {
t.Errorf("ExtractBatchDetails() projectID = %v, want %v", projectID, "my-project")
}
if location != "us-central1" {
t.Errorf("ExtractBatchDetails() location = %v, want %v", location, "us-central1")
}
if batchID != "my-batch" {
t.Errorf("ExtractBatchDetails() batchID = %v, want %v", batchID, "my-batch")
}
}
func TestExtractBatchDetails_Failure(t *testing.T) {
t.Parallel()
batchName := "invalid-name"
_, _, _, err := ExtractBatchDetails(batchName)
if err == nil {
t.Errorf("ExtractBatchDetails() error = %v, wantErr %v", err, true)
}
}
func TestBatchConsoleURL(t *testing.T) {
got := BatchConsoleURL("my-project", "us-central1", "my-batch")
want := "https://console.cloud.google.com/dataproc/batches/us-central1/my-batch/summary?project=my-project"
if got != want {
t.Errorf("BatchConsoleURL() = %v, want %v", got, want)
}
}
func TestBatchLogsURL(t *testing.T) {
t.Parallel()
startTime := time.Date(2025, 10, 1, 5, 0, 0, 0, time.UTC)
endTime := time.Date(2025, 10, 1, 6, 0, 0, 0, time.UTC)
got := BatchLogsURL("my-project", "us-central1", "my-batch", startTime, endTime)
want := "https://console.cloud.google.com/logs/viewer?advancedFilter=" +
"resource.type%3D%22cloud_dataproc_batch%22" +
"%0Aresource.labels.project_id%3D%22my-project%22" +
"%0Aresource.labels.location%3D%22us-central1%22" +
"%0Aresource.labels.batch_id%3D%22my-batch%22" +
"%0Atimestamp%3E%3D%222025-10-01T04%3A59%3A00Z%22" + // Minus 1 minute
"%0Atimestamp%3C%3D%222025-10-01T06%3A10%3A00Z%22" + // Plus 10 minutes
"&project=my-project" +
"&resource=cloud_dataproc_batch%2Fbatch_id%2Fmy-batch"
if got != want {
t.Errorf("BatchLogsURL() = %v, want %v", got, want)
}
}
func TestBatchConsoleURLFromProto(t *testing.T) {
t.Parallel()
batchPb := &dataprocpb.Batch{
Name: "projects/my-project/locations/us-central1/batches/my-batch",
}
got, err := BatchConsoleURLFromProto(batchPb)
if err != nil {
t.Fatalf("BatchConsoleURLFromProto() error = %v", err)
}
want := "https://console.cloud.google.com/dataproc/batches/us-central1/my-batch/summary?project=my-project"
if got != want {
t.Errorf("BatchConsoleURLFromProto() = %v, want %v", got, want)
}
}
func TestBatchLogsURLFromProto(t *testing.T) {
t.Parallel()
createTime := time.Date(2025, 10, 1, 5, 0, 0, 0, time.UTC)
stateTime := time.Date(2025, 10, 1, 6, 0, 0, 0, time.UTC)
batchPb := &dataprocpb.Batch{
Name: "projects/my-project/locations/us-central1/batches/my-batch",
CreateTime: timestamppb.New(createTime),
StateTime: timestamppb.New(stateTime),
}
got, err := BatchLogsURLFromProto(batchPb)
if err != nil {
t.Fatalf("BatchLogsURLFromProto() error = %v", err)
}
want := "https://console.cloud.google.com/logs/viewer?advancedFilter=" +
"resource.type%3D%22cloud_dataproc_batch%22" +
"%0Aresource.labels.project_id%3D%22my-project%22" +
"%0Aresource.labels.location%3D%22us-central1%22" +
"%0Aresource.labels.batch_id%3D%22my-batch%22" +
"%0Atimestamp%3E%3D%222025-10-01T04%3A59%3A00Z%22" + // Minus 1 minute
"%0Atimestamp%3C%3D%222025-10-01T06%3A10%3A00Z%22" + // Plus 10 minutes
"&project=my-project" +
"&resource=cloud_dataproc_batch%2Fbatch_id%2Fmy-batch"
if got != want {
t.Errorf("BatchLogsURLFromProto() = %v, want %v", got, want)
}
}

View File

@@ -25,6 +25,7 @@ import (
"github.com/googleapis/genai-toolbox/internal/sources"
"github.com/googleapis/genai-toolbox/internal/sources/serverlessspark"
"github.com/googleapis/genai-toolbox/internal/tools"
"github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/common"
"github.com/googleapis/genai-toolbox/internal/util/parameters"
"google.golang.org/protobuf/encoding/protojson"
)
@@ -142,9 +143,23 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para
return nil, fmt.Errorf("failed to unmarshal batch JSON: %w", err)
}
return result, nil
}
consoleUrl, err := common.BatchConsoleURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating console url: %v", err)
}
logsUrl, err := common.BatchLogsURLFromProto(batchPb)
if err != nil {
return nil, fmt.Errorf("error generating logs url: %v", err)
}
wrappedResult := map[string]any{
"consoleUrl": consoleUrl,
"logsUrl": logsUrl,
"batch": result,
}
return wrappedResult, nil
}
func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) {
return parameters.ParseParams(t.Parameters, data, claims)
}

View File

@@ -48,6 +48,11 @@ var (
serverlessSparkServiceAccount = os.Getenv("SERVERLESS_SPARK_SERVICE_ACCOUNT")
)
const (
batchURLPrefix = "https://console.cloud.google.com/dataproc/batches/"
logsURLPrefix = "https://console.cloud.google.com/logs/viewer?"
)
func getServerlessSparkVars(t *testing.T) map[string]any {
switch "" {
case serverlessSparkLocation:
@@ -868,11 +873,27 @@ func runGetBatchTest(t *testing.T, client *dataproc.BatchControllerClient, ctx c
if !ok {
t.Fatalf("unable to find result in response body")
}
var wrappedResult map[string]any
if err := json.Unmarshal([]byte(result), &wrappedResult); err != nil {
t.Fatalf("error unmarshalling result: %s", err)
}
consoleURL, ok := wrappedResult["consoleUrl"].(string)
if !ok || !strings.HasPrefix(consoleURL, batchURLPrefix) {
t.Errorf("unexpected consoleUrl: %v", consoleURL)
}
logsURL, ok := wrappedResult["logsUrl"].(string)
if !ok || !strings.HasPrefix(logsURL, logsURLPrefix) {
t.Errorf("unexpected logsUrl: %v", logsURL)
}
batchJSON, err := json.Marshal(wrappedResult["batch"])
if err != nil {
t.Fatalf("failed to marshal batch: %v", err)
}
// Unmarshal JSON to proto for proto-aware deep comparison.
var batch dataprocpb.Batch
if err := protojson.Unmarshal([]byte(result), &batch); err != nil {
t.Fatalf("error unmarshalling result: %s", err)
if err := protojson.Unmarshal(batchJSON, &batch); err != nil {
t.Fatalf("error unmarshalling batch from wrapped result: %s", err)
}
if !cmp.Equal(&batch, tc.want, protocmp.Transform()) {