From e29c0616d6b9ecda2badcaf7b69614e511ac031b Mon Sep 17 00:00:00 2001 From: Dave Borowitz Date: Fri, 21 Nov 2025 09:52:10 -0800 Subject: [PATCH] feat(serverless-spark)!: add Cloud Console and Logging URLs to get_batch These are useful links for humans to follow for more information (output, metrics, logs) that's not readily availble via MCP. --- .../serverless-spark-get-batch.md | 73 ++++++----- internal/tools/serverlessspark/common/urls.go | 91 +++++++++++++ .../tools/serverlessspark/common/urls_test.go | 120 ++++++++++++++++++ .../serverlesssparkgetbatch.go | 19 ++- .../serverless_spark_integration_test.go | 25 +++- 5 files changed, 291 insertions(+), 37 deletions(-) create mode 100644 internal/tools/serverlessspark/common/urls.go create mode 100644 internal/tools/serverlessspark/common/urls_test.go diff --git a/docs/en/resources/tools/serverless-spark/serverless-spark-get-batch.md b/docs/en/resources/tools/serverless-spark/serverless-spark-get-batch.md index 532af65344..754aab9fd9 100644 --- a/docs/en/resources/tools/serverless-spark/serverless-spark-get-batch.md +++ b/docs/en/resources/tools/serverless-spark/serverless-spark-get-batch.md @@ -34,43 +34,50 @@ tools: ## Response Format -The response is a full Batch JSON object as defined in the [API -spec](https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#Batch). -Example with a reduced set of fields: +The response contains the full Batch object as defined in the [API +spec](https://cloud.google.com/dataproc-serverless/docs/reference/rest/v1/projects.locations.batches#Batch), +plus additional fields `consoleUrl` and `logsUrl` where a human can go for more +detailed information. ```json { - "createTime": "2025-10-10T15:15:21.303146Z", - "creator": "alice@example.com", - "labels": { - "goog-dataproc-batch-uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", - "goog-dataproc-location": "us-central1" - }, - "name": "projects/google.com:hadoop-cloud-dev/locations/us-central1/batches/alice-20251010-abcd", - "operation": "projects/google.com:hadoop-cloud-dev/regions/us-central1/operations/11111111-2222-3333-4444-555555555555", - "runtimeConfig": { - "properties": { - "spark:spark.driver.cores": "4", - "spark:spark.driver.memory": "12200m" - } - }, - "sparkBatch": { - "jarFileUris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"], - "mainClass": "org.apache.spark.examples.SparkPi" - }, - "state": "SUCCEEDED", - "stateHistory": [ - { - "state": "PENDING", - "stateStartTime": "2025-10-10T15:15:21.303146Z" + "batch": { + "createTime": "2025-10-10T15:15:21.303146Z", + "creator": "alice@example.com", + "labels": { + "goog-dataproc-batch-uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + "goog-dataproc-location": "us-central1" }, - { - "state": "RUNNING", - "stateStartTime": "2025-10-10T15:16:41.291747Z" - } - ], - "stateTime": "2025-10-10T15:17:21.265493Z", - "uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + "name": "projects/google.com:hadoop-cloud-dev/locations/us-central1/batches/alice-20251010-abcd", + "operation": "projects/google.com:hadoop-cloud-dev/regions/us-central1/operations/11111111-2222-3333-4444-555555555555", + "runtimeConfig": { + "properties": { + "spark:spark.driver.cores": "4", + "spark:spark.driver.memory": "12200m" + } + }, + "sparkBatch": { + "jarFileUris": [ + "file:///usr/lib/spark/examples/jars/spark-examples.jar" + ], + "mainClass": "org.apache.spark.examples.SparkPi" + }, + "state": "SUCCEEDED", + "stateHistory": [ + { + "state": "PENDING", + "stateStartTime": "2025-10-10T15:15:21.303146Z" + }, + { + "state": "RUNNING", + "stateStartTime": "2025-10-10T15:16:41.291747Z" + } + ], + "stateTime": "2025-10-10T15:17:21.265493Z", + "uuid": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" + }, + "consoleUrl": "https://console.cloud.google.com/dataproc/batches/...", + "logsUrl": "https://console.cloud.google.com/logs/viewer?..." } ``` diff --git a/internal/tools/serverlessspark/common/urls.go b/internal/tools/serverlessspark/common/urls.go new file mode 100644 index 0000000000..3b52235992 --- /dev/null +++ b/internal/tools/serverlessspark/common/urls.go @@ -0,0 +1,91 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "fmt" + "net/url" + "regexp" + "time" + + "cloud.google.com/go/dataproc/v2/apiv1/dataprocpb" +) + +const ( + logTimeBufferBefore = 1 * time.Minute + logTimeBufferAfter = 10 * time.Minute +) + +var batchFullNameRegex = regexp.MustCompile(`projects/(?P[^/]+)/locations/(?P[^/]+)/batches/(?P[^/]+)`) + +// Extract BatchDetails extracts the project ID, location, and batch ID from a fully qualified batch name. +func ExtractBatchDetails(batchName string) (projectID, location, batchID string, err error) { + matches := batchFullNameRegex.FindStringSubmatch(batchName) + if len(matches) < 4 { + return "", "", "", fmt.Errorf("failed to parse batch name: %s", batchName) + } + return matches[1], matches[2], matches[3], nil +} + +// BatchConsoleURLFromProto builds a URL to the Google Cloud Console linking to the batch summary page. +func BatchConsoleURLFromProto(batchPb *dataprocpb.Batch) (string, error) { + projectID, location, batchID, err := ExtractBatchDetails(batchPb.GetName()) + if err != nil { + return "", err + } + return BatchConsoleURL(projectID, location, batchID), nil +} + +// BatchLogsURLFromProto builds a URL to the Google Cloud Console showing Cloud Logging for the given batch and time range. +func BatchLogsURLFromProto(batchPb *dataprocpb.Batch) (string, error) { + projectID, location, batchID, err := ExtractBatchDetails(batchPb.GetName()) + if err != nil { + return "", err + } + createTime := batchPb.GetCreateTime().AsTime() + stateTime := batchPb.GetStateTime().AsTime() + return BatchLogsURL(projectID, location, batchID, createTime, stateTime), nil +} + +// BatchConsoleURL builds a URL to the Google Cloud Console linking to the batch summary page. +func BatchConsoleURL(projectID, location, batchID string) string { + return fmt.Sprintf("https://console.cloud.google.com/dataproc/batches/%s/%s/summary?project=%s", location, batchID, projectID) +} + +// BatchLogsURL builds a URL to the Google Cloud Console showing Cloud Logging for the given batch and time range. +// +// The implementation adds some buffer before and after the provided times. +func BatchLogsURL(projectID, location, batchID string, startTime, endTime time.Time) string { + advancedFilterTemplate := `resource.type="cloud_dataproc_batch" +resource.labels.project_id="%s" +resource.labels.location="%s" +resource.labels.batch_id="%s"` + advancedFilter := fmt.Sprintf(advancedFilterTemplate, projectID, location, batchID) + if !startTime.IsZero() { + actualStart := startTime.Add(-1 * logTimeBufferBefore) + advancedFilter += fmt.Sprintf("\ntimestamp>=\"%s\"", actualStart.Format(time.RFC3339Nano)) + } + if !endTime.IsZero() { + actualEnd := endTime.Add(logTimeBufferAfter) + advancedFilter += fmt.Sprintf("\ntimestamp<=\"%s\"", actualEnd.Format(time.RFC3339Nano)) + } + + v := url.Values{} + v.Add("resource", "cloud_dataproc_batch/batch_id/"+batchID) + v.Add("advancedFilter", advancedFilter) + v.Add("project", projectID) + + return "https://console.cloud.google.com/logs/viewer?" + v.Encode() +} diff --git a/internal/tools/serverlessspark/common/urls_test.go b/internal/tools/serverlessspark/common/urls_test.go new file mode 100644 index 0000000000..ea81857786 --- /dev/null +++ b/internal/tools/serverlessspark/common/urls_test.go @@ -0,0 +1,120 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "testing" + "time" + + "cloud.google.com/go/dataproc/v2/apiv1/dataprocpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestExtractBatchDetails_Success(t *testing.T) { + t.Parallel() + batchName := "projects/my-project/locations/us-central1/batches/my-batch" + projectID, location, batchID, err := ExtractBatchDetails(batchName) + if err != nil { + t.Errorf("ExtractBatchDetails() error = %v, wantErr %v", err, false) + return + } + if projectID != "my-project" { + t.Errorf("ExtractBatchDetails() projectID = %v, want %v", projectID, "my-project") + } + if location != "us-central1" { + t.Errorf("ExtractBatchDetails() location = %v, want %v", location, "us-central1") + } + if batchID != "my-batch" { + t.Errorf("ExtractBatchDetails() batchID = %v, want %v", batchID, "my-batch") + } +} + +func TestExtractBatchDetails_Failure(t *testing.T) { + t.Parallel() + batchName := "invalid-name" + _, _, _, err := ExtractBatchDetails(batchName) + if err == nil { + t.Errorf("ExtractBatchDetails() error = %v, wantErr %v", err, true) + } +} + +func TestBatchConsoleURL(t *testing.T) { + got := BatchConsoleURL("my-project", "us-central1", "my-batch") + want := "https://console.cloud.google.com/dataproc/batches/us-central1/my-batch/summary?project=my-project" + if got != want { + t.Errorf("BatchConsoleURL() = %v, want %v", got, want) + } +} + +func TestBatchLogsURL(t *testing.T) { + t.Parallel() + startTime := time.Date(2025, 10, 1, 5, 0, 0, 0, time.UTC) + endTime := time.Date(2025, 10, 1, 6, 0, 0, 0, time.UTC) + got := BatchLogsURL("my-project", "us-central1", "my-batch", startTime, endTime) + want := "https://console.cloud.google.com/logs/viewer?advancedFilter=" + + "resource.type%3D%22cloud_dataproc_batch%22" + + "%0Aresource.labels.project_id%3D%22my-project%22" + + "%0Aresource.labels.location%3D%22us-central1%22" + + "%0Aresource.labels.batch_id%3D%22my-batch%22" + + "%0Atimestamp%3E%3D%222025-10-01T04%3A59%3A00Z%22" + // Minus 1 minute + "%0Atimestamp%3C%3D%222025-10-01T06%3A10%3A00Z%22" + // Plus 10 minutes + "&project=my-project" + + "&resource=cloud_dataproc_batch%2Fbatch_id%2Fmy-batch" + if got != want { + t.Errorf("BatchLogsURL() = %v, want %v", got, want) + } +} + +func TestBatchConsoleURLFromProto(t *testing.T) { + t.Parallel() + batchPb := &dataprocpb.Batch{ + Name: "projects/my-project/locations/us-central1/batches/my-batch", + } + got, err := BatchConsoleURLFromProto(batchPb) + if err != nil { + t.Fatalf("BatchConsoleURLFromProto() error = %v", err) + } + want := "https://console.cloud.google.com/dataproc/batches/us-central1/my-batch/summary?project=my-project" + if got != want { + t.Errorf("BatchConsoleURLFromProto() = %v, want %v", got, want) + } +} + +func TestBatchLogsURLFromProto(t *testing.T) { + t.Parallel() + createTime := time.Date(2025, 10, 1, 5, 0, 0, 0, time.UTC) + stateTime := time.Date(2025, 10, 1, 6, 0, 0, 0, time.UTC) + batchPb := &dataprocpb.Batch{ + Name: "projects/my-project/locations/us-central1/batches/my-batch", + CreateTime: timestamppb.New(createTime), + StateTime: timestamppb.New(stateTime), + } + got, err := BatchLogsURLFromProto(batchPb) + if err != nil { + t.Fatalf("BatchLogsURLFromProto() error = %v", err) + } + want := "https://console.cloud.google.com/logs/viewer?advancedFilter=" + + "resource.type%3D%22cloud_dataproc_batch%22" + + "%0Aresource.labels.project_id%3D%22my-project%22" + + "%0Aresource.labels.location%3D%22us-central1%22" + + "%0Aresource.labels.batch_id%3D%22my-batch%22" + + "%0Atimestamp%3E%3D%222025-10-01T04%3A59%3A00Z%22" + // Minus 1 minute + "%0Atimestamp%3C%3D%222025-10-01T06%3A10%3A00Z%22" + // Plus 10 minutes + "&project=my-project" + + "&resource=cloud_dataproc_batch%2Fbatch_id%2Fmy-batch" + if got != want { + t.Errorf("BatchLogsURLFromProto() = %v, want %v", got, want) + } +} diff --git a/internal/tools/serverlessspark/serverlesssparkgetbatch/serverlesssparkgetbatch.go b/internal/tools/serverlessspark/serverlesssparkgetbatch/serverlesssparkgetbatch.go index b94581d903..558910cb9f 100644 --- a/internal/tools/serverlessspark/serverlesssparkgetbatch/serverlesssparkgetbatch.go +++ b/internal/tools/serverlessspark/serverlesssparkgetbatch/serverlesssparkgetbatch.go @@ -25,6 +25,7 @@ import ( "github.com/googleapis/genai-toolbox/internal/sources" "github.com/googleapis/genai-toolbox/internal/sources/serverlessspark" "github.com/googleapis/genai-toolbox/internal/tools" + "github.com/googleapis/genai-toolbox/internal/tools/serverlessspark/common" "github.com/googleapis/genai-toolbox/internal/util/parameters" "google.golang.org/protobuf/encoding/protojson" ) @@ -142,9 +143,23 @@ func (t Tool) Invoke(ctx context.Context, resourceMgr tools.SourceProvider, para return nil, fmt.Errorf("failed to unmarshal batch JSON: %w", err) } - return result, nil -} + consoleUrl, err := common.BatchConsoleURLFromProto(batchPb) + if err != nil { + return nil, fmt.Errorf("error generating console url: %v", err) + } + logsUrl, err := common.BatchLogsURLFromProto(batchPb) + if err != nil { + return nil, fmt.Errorf("error generating logs url: %v", err) + } + wrappedResult := map[string]any{ + "consoleUrl": consoleUrl, + "logsUrl": logsUrl, + "batch": result, + } + + return wrappedResult, nil +} func (t Tool) ParseParams(data map[string]any, claims map[string]map[string]any) (parameters.ParamValues, error) { return parameters.ParseParams(t.Parameters, data, claims) } diff --git a/tests/serverlessspark/serverless_spark_integration_test.go b/tests/serverlessspark/serverless_spark_integration_test.go index 7446e15ff2..6da91343c5 100644 --- a/tests/serverlessspark/serverless_spark_integration_test.go +++ b/tests/serverlessspark/serverless_spark_integration_test.go @@ -48,6 +48,11 @@ var ( serverlessSparkServiceAccount = os.Getenv("SERVERLESS_SPARK_SERVICE_ACCOUNT") ) +const ( + batchURLPrefix = "https://console.cloud.google.com/dataproc/batches/" + logsURLPrefix = "https://console.cloud.google.com/logs/viewer?" +) + func getServerlessSparkVars(t *testing.T) map[string]any { switch "" { case serverlessSparkLocation: @@ -868,11 +873,27 @@ func runGetBatchTest(t *testing.T, client *dataproc.BatchControllerClient, ctx c if !ok { t.Fatalf("unable to find result in response body") } + var wrappedResult map[string]any + if err := json.Unmarshal([]byte(result), &wrappedResult); err != nil { + t.Fatalf("error unmarshalling result: %s", err) + } + consoleURL, ok := wrappedResult["consoleUrl"].(string) + if !ok || !strings.HasPrefix(consoleURL, batchURLPrefix) { + t.Errorf("unexpected consoleUrl: %v", consoleURL) + } + logsURL, ok := wrappedResult["logsUrl"].(string) + if !ok || !strings.HasPrefix(logsURL, logsURLPrefix) { + t.Errorf("unexpected logsUrl: %v", logsURL) + } + batchJSON, err := json.Marshal(wrappedResult["batch"]) + if err != nil { + t.Fatalf("failed to marshal batch: %v", err) + } // Unmarshal JSON to proto for proto-aware deep comparison. var batch dataprocpb.Batch - if err := protojson.Unmarshal([]byte(result), &batch); err != nil { - t.Fatalf("error unmarshalling result: %s", err) + if err := protojson.Unmarshal(batchJSON, &batch); err != nil { + t.Fatalf("error unmarshalling batch from wrapped result: %s", err) } if !cmp.Equal(&batch, tc.want, protocmp.Transform()) {