Files
AutoGPT/autogpt_platform/backend/backend/blocks/google/docs.py
Nicholas Tindle cb08def96c feat(blocks): Add Google Docs integration blocks (#11608)
Introduces a new module with blocks for Google Docs operations,
including reading, creating, appending, inserting, formatting,
exporting, sharing, and managing public access for Google Docs. Updates
dependencies in pyproject.toml and poetry.lock to support these
features.



https://github.com/user-attachments/assets/3597366b-a9eb-4f8e-8a0a-5a0bc8ebc09b



<!-- Clearly explain the need for these changes: -->

### Changes 🏗️
Adds lots of basic docs tools + a dependency to use them with markdown

Block | Description | Key Features
-- | -- | --
Read & Create |   |  
GoogleDocsReadBlock | Read content from a Google Doc | Returns text
content, title, revision ID
GoogleDocsCreateBlock | Create a new Google Doc | Title, optional
initial content
GoogleDocsGetMetadataBlock | Get document metadata | Title, revision ID,
locale, suggested modes
GoogleDocsGetStructureBlock | Get document structure with indexes | Flat
segments or detailed hierarchy; shows start/end indexes
Plain Text Operations |   |  
GoogleDocsAppendPlainTextBlock | Append plain text to end | No
formatting applied
GoogleDocsInsertPlainTextBlock | Insert plain text at position |
Requires index; no formatting
GoogleDocsFindReplacePlainTextBlock | Find and replace plain text |
Case-sensitive option; no formatting on replacement
Markdown Operations | (ideal for LLM/AI output) |  
GoogleDocsAppendMarkdownBlock | Append Markdown to end | Full formatting
via gravitas-md2gdocs
GoogleDocsInsertMarkdownAtBlock | Insert Markdown at position | Requires
index
GoogleDocsReplaceAllWithMarkdownBlock | Replace entire doc with Markdown
| Clears and rewrites
GoogleDocsReplaceRangeWithMarkdownBlock | Replace index range with
Markdown | Requires start/end index
GoogleDocsReplaceContentWithMarkdownBlock | Find text and replace with
Markdown | Text-based search; great for templates
Structural Operations |   |  
GoogleDocsInsertTableBlock | Insert a table | Rows/columns OR content
array; optional Markdown in cells
GoogleDocsInsertPageBreakBlock | Insert a page break | Position index (0
= end)
GoogleDocsDeleteContentBlock | Delete content range | Requires start/end
index
GoogleDocsFormatTextBlock | Apply formatting to text range | Bold,
italic, underline, font size/color, etc.
Export & Sharing |   |  
GoogleDocsExportBlock | Export to different formats | PDF, DOCX, TXT,
HTML, RTF, ODT, EPUB
GoogleDocsShareBlock | Share with specific users | Reader, commenter,
writer, owner roles
GoogleDocsSetPublicAccessBlock | Set public access level | Private,
anyone with link (view/comment/edit)


<!-- Concisely describe all of the changes made in this pull request:
-->

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Build, run, verify, and upload a block super test
- [x] [Google Docs Super
Agent_v8.json](https://github.com/user-attachments/files/24134215/Google.Docs.Super.Agent_v8.json)
works


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Chores**
  * Updated backend dependencies.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>

<!-- end of auto-generated comment: release notes by coderabbit.ai -->

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Adds end-to-end Google Docs capabilities under
`backend/blocks/google/docs.py`, including rich Markdown support.
> 
> - New blocks: read/create docs; plain-text
`append`/`insert`/`find_replace`/`delete`; text `format`;
`insert_table`; `insert_page_break`; `get_metadata`; `get_structure`
> - Markdown-powered blocks (via `gravitas_md2gdocs.to_requests`):
`append_markdown`, `insert_markdown_at`, `replace_all_with_markdown`,
`replace_range_with_markdown`, `replace_content_with_markdown`
> - Export and sharing: `export` (PDF/DOCX/TXT/HTML/RTF/ODT/EPUB),
`share` (user roles), `set_public_access`
> - Dependency updates: add `gravitas-md2gdocs` to `pyproject.toml` and
update `poetry.lock`
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
73512a95b2. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Co-authored-by: Cursor Agent <cursoragent@cursor.com>
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com>
Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2026-01-05 18:36:56 +00:00

2897 lines
106 KiB
Python

import asyncio
import re
from enum import Enum
from typing import Any
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from gravitas_md2gdocs import to_requests
from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.settings import Settings
from ._auth import (
GOOGLE_OAUTH_IS_CONFIGURED,
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
GoogleCredentials,
GoogleCredentialsField,
GoogleCredentialsInput,
)
settings = Settings()
GOOGLE_DOCS_DISABLED = not GOOGLE_OAUTH_IS_CONFIGURED
# ============ Enums ============
class PublicAccessRole(str, Enum):
READER = "reader"
COMMENTER = "commenter"
class ShareRole(str, Enum):
READER = "reader"
WRITER = "writer"
COMMENTER = "commenter"
# ============ Helper Functions ============
def _build_docs_service(credentials: GoogleCredentials):
"""Build Google Docs API service."""
creds = Credentials(
token=(
credentials.access_token.get_secret_value()
if credentials.access_token
else None
),
refresh_token=(
credentials.refresh_token.get_secret_value()
if credentials.refresh_token
else None
),
token_uri="https://oauth2.googleapis.com/token",
client_id=settings.secrets.google_client_id,
client_secret=settings.secrets.google_client_secret,
scopes=credentials.scopes,
)
return build("docs", "v1", credentials=creds, cache_discovery=False)
def _build_drive_service(credentials: GoogleCredentials):
"""Build Google Drive API service for file operations."""
creds = Credentials(
token=(
credentials.access_token.get_secret_value()
if credentials.access_token
else None
),
refresh_token=(
credentials.refresh_token.get_secret_value()
if credentials.refresh_token
else None
),
token_uri="https://oauth2.googleapis.com/token",
client_id=settings.secrets.google_client_id,
client_secret=settings.secrets.google_client_secret,
scopes=credentials.scopes,
)
return build("drive", "v3", credentials=creds, cache_discovery=False)
def _validate_document_file(file: GoogleDriveFile) -> str | None:
"""Validate that a file is a Google Doc."""
if not file.id:
return "No document ID provided"
if file.mime_type and file.mime_type != "application/vnd.google-apps.document":
return f"File is not a Google Doc (type: {file.mime_type})"
return None
def _parse_hex_color_to_rgb_floats(value: str) -> tuple[float, float, float] | None:
"""
Parse a CSS-like hex color string into normalized RGB floats.
Supports:
- #RGB / RGB (shorthand)
- #RRGGBB / RRGGBB
Returns None for malformed inputs.
"""
if not value:
return None
raw = value.strip()
if raw.startswith("#"):
raw = raw[1:]
if not re.fullmatch(r"[0-9a-fA-F]{3}([0-9a-fA-F]{3})?", raw):
return None
if len(raw) == 3:
raw = "".join(ch * 2 for ch in raw)
r = int(raw[0:2], 16) / 255.0
g = int(raw[2:4], 16) / 255.0
b = int(raw[4:6], 16) / 255.0
return (r, g, b)
def _get_document_end_index(service, document_id: str) -> int:
"""Get the index at the end of the document body."""
doc = service.documents().get(documentId=document_id).execute()
body = doc.get("body", {})
content = body.get("content", [])
if content:
last_element = content[-1]
return last_element.get("endIndex", 1) - 1
return 1
def _extract_text_from_content(content: list[dict]) -> str:
"""Extract plain text from document content structure."""
text_parts = []
for element in content:
if "paragraph" in element:
for elem in element["paragraph"].get("elements", []):
if "textRun" in elem:
text_parts.append(elem["textRun"].get("content", ""))
elif "table" in element:
for row in element["table"].get("tableRows", []):
for cell in row.get("tableCells", []):
cell_content = cell.get("content", [])
text_parts.append(_extract_text_from_content(cell_content))
return "".join(text_parts)
# ============ Document Output Helper ============
def _make_document_output(file: GoogleDriveFile) -> GoogleDriveFile:
"""Create standardized document output for chaining."""
return GoogleDriveFile(
id=file.id,
name=file.name,
mimeType="application/vnd.google-apps.document",
url=f"https://docs.google.com/document/d/{file.id}/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=file.credentials_id,
)
# ============ Blocks ============
class GoogleDocsReadBlock(Block):
"""Read content from a Google Doc."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc to read",
allowed_views=["DOCUMENTS"],
)
class Output(BlockSchemaOutput):
text: str = SchemaField(description="Plain text content of the document")
title: str = SchemaField(description="Document title")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if read failed")
def __init__(self):
super().__init__(
id="420a2b3c-5db2-4bda-82bc-a68a862a3d55",
description="Read text content from a Google Doc",
categories={BlockCategory.DATA},
input_schema=GoogleDocsReadBlock.Input,
output_schema=GoogleDocsReadBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("text", "Hello World\nThis is a test document.\n"),
("title", "Test Document"),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_read_document": lambda *args, **kwargs: {
"text": "Hello World\nThis is a test document.\n",
"title": "Test Document",
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._read_document,
service,
input_data.document.id,
)
yield "text", result["text"]
yield "title", result["title"]
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to read document: {str(e)}"
def _read_document(self, service, document_id: str) -> dict:
doc = service.documents().get(documentId=document_id).execute()
title = doc.get("title", "")
body = doc.get("body", {})
content = body.get("content", [])
text = _extract_text_from_content(content)
return {"text": text, "title": title}
class GoogleDocsCreateBlock(Block):
"""Create a new Google Doc."""
class Input(BlockSchemaInput):
credentials: GoogleCredentialsInput = GoogleCredentialsField(
["https://www.googleapis.com/auth/drive.file"]
)
title: str = SchemaField(description="Title for the new document")
initial_content: str = SchemaField(
default="",
description="Optional initial text content",
)
class Output(BlockSchemaOutput):
document: GoogleDriveFile = SchemaField(description="The created document")
document_id: str = SchemaField(description="ID of the created document")
document_url: str = SchemaField(description="URL to open the document")
error: str = SchemaField(description="Error message if creation failed")
def __init__(self):
super().__init__(
id="d430d941-cf81-4f84-8b19-2e3f670b2fca",
description="Create a new Google Doc",
categories={BlockCategory.DATA},
input_schema=GoogleDocsCreateBlock.Input,
output_schema=GoogleDocsCreateBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
"title": "My New Document",
"initial_content": "Hello, this is the initial content.",
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"document",
GoogleDriveFile(
id="new_doc_123",
name="My New Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/new_doc_123/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=TEST_CREDENTIALS_INPUT["id"],
),
),
("document_id", "new_doc_123"),
("document_url", "https://docs.google.com/document/d/new_doc_123/edit"),
],
test_mock={
"_create_document": lambda *args, **kwargs: {
"document_id": "new_doc_123",
"document_url": "https://docs.google.com/document/d/new_doc_123/edit",
"title": "My New Document",
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.title:
yield "error", "Document title is required"
return
try:
drive_service = _build_drive_service(credentials)
docs_service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._create_document,
drive_service,
docs_service,
input_data.title,
input_data.initial_content,
)
doc_id = result["document_id"]
doc_url = result["document_url"]
yield "document", GoogleDriveFile(
id=doc_id,
name=input_data.title,
mimeType="application/vnd.google-apps.document",
url=doc_url,
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=input_data.credentials.id,
)
yield "document_id", doc_id
yield "document_url", doc_url
except Exception as e:
yield "error", f"Failed to create document: {str(e)}"
def _create_document(
self, drive_service, docs_service, title: str, initial_content: str
) -> dict:
# Create the document
file_metadata = {
"name": title,
"mimeType": "application/vnd.google-apps.document",
}
result = drive_service.files().create(body=file_metadata).execute()
document_id = result.get("id")
document_url = f"https://docs.google.com/document/d/{document_id}/edit"
# Add initial content if provided
if initial_content:
requests = [
{
"insertText": {
"location": {"index": 1},
"text": initial_content,
}
}
]
docs_service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {
"document_id": document_id,
"document_url": document_url,
"title": title,
}
class GoogleDocsAppendPlainTextBlock(Block):
"""Append plain text to the end of a Google Doc (no formatting)."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc to append to",
allowed_views=["DOCUMENTS"],
)
text: str = SchemaField(
description="Plain text to append (no formatting applied)"
)
add_newline: bool = SchemaField(
default=True,
description="Add a newline before the appended text",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of the append operation")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if append failed")
def __init__(self):
super().__init__(
id="ddc29d9f-78dc-4682-8787-c8a76f00cf38",
description="Append plain text to the end of a Google Doc (no formatting applied)",
categories={BlockCategory.DATA},
input_schema=GoogleDocsAppendPlainTextBlock.Input,
output_schema=GoogleDocsAppendPlainTextBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"text": "This is appended text.",
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True, "characters_added": 23}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_append_text": lambda *args, **kwargs: {
"success": True,
"characters_added": 23,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._append_text,
service,
input_data.document.id,
input_data.text,
input_data.add_newline,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to append text: {str(e)}"
def _append_text(
self, service, document_id: str, text: str, add_newline: bool
) -> dict:
end_index = _get_document_end_index(service, document_id)
text_to_insert = ("\n" if add_newline else "") + text
requests = [
{
"insertText": {
"location": {"index": end_index},
"text": text_to_insert,
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {"success": True, "characters_added": len(text_to_insert)}
class GoogleDocsInsertPlainTextBlock(Block):
"""Insert plain text at a specific position in a Google Doc (no formatting)."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc to insert into",
allowed_views=["DOCUMENTS"],
)
text: str = SchemaField(
description="Plain text to insert (no formatting applied)"
)
index: int = SchemaField(
default=1,
description="Position index to insert at (1 = start of document)",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of the insert operation")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if insert failed")
def __init__(self):
super().__init__(
id="0443fdbc-ebb0-49a4-a3ea-6ace9c14da22",
description="Insert plain text at a specific position in a Google Doc (no formatting applied)",
categories={BlockCategory.DATA},
input_schema=GoogleDocsInsertPlainTextBlock.Input,
output_schema=GoogleDocsInsertPlainTextBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"text": "Inserted text here. ",
"index": 1,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True, "characters_inserted": 20}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_insert_text": lambda *args, **kwargs: {
"success": True,
"characters_inserted": 20,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._insert_text,
service,
input_data.document.id,
input_data.text,
input_data.index,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to insert text: {str(e)}"
def _insert_text(self, service, document_id: str, text: str, index: int) -> dict:
requests = [
{
"insertText": {
"location": {"index": max(1, index)},
"text": text,
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {"success": True, "characters_inserted": len(text)}
class GoogleDocsFindReplacePlainTextBlock(Block):
"""Find and replace plain text in a Google Doc (no formatting applied to replacement)."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc",
allowed_views=["DOCUMENTS"],
)
find_text: str = SchemaField(description="Plain text to find")
replace_text: str = SchemaField(
description="Plain text to replace with (no formatting applied)"
)
match_case: bool = SchemaField(
default=False,
description="Match case when finding text",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result with replacement count")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="e5046ee2-b094-418e-a25e-c0f90c91721c",
description="Find and replace plain text in a Google Doc (no formatting applied to replacement)",
categories={BlockCategory.DATA},
input_schema=GoogleDocsFindReplacePlainTextBlock.Input,
output_schema=GoogleDocsFindReplacePlainTextBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"find_text": "old text",
"replace_text": "new text",
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True, "replacements_made": 3}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_find_replace": lambda *args, **kwargs: {
"success": True,
"replacements_made": 3,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._find_replace,
service,
input_data.document.id,
input_data.find_text,
input_data.replace_text,
input_data.match_case,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to find/replace: {str(e)}"
def _find_replace(
self,
service,
document_id: str,
find_text: str,
replace_text: str,
match_case: bool,
) -> dict:
requests = [
{
"replaceAllText": {
"containsText": {
"text": find_text,
"matchCase": match_case,
},
"replaceText": replace_text,
}
}
]
response = (
service.documents()
.batchUpdate(documentId=document_id, body={"requests": requests})
.execute()
)
# Get replacement count from response
replies = response.get("replies", [])
replacements = 0
if replies and "replaceAllText" in replies[0]:
replacements = replies[0]["replaceAllText"].get("occurrencesChanged", 0)
return {"success": True, "replacements_made": replacements}
class GoogleDocsGetMetadataBlock(Block):
"""Get metadata about a Google Doc."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc",
allowed_views=["DOCUMENTS"],
)
class Output(BlockSchemaOutput):
title: str = SchemaField(description="Document title")
document_id: str = SchemaField(description="Document ID")
revision_id: str = SchemaField(description="Current revision ID")
document_url: str = SchemaField(description="URL to open the document")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="100bc806-acbf-4dc5-a3a2-998026b96516",
description="Get metadata about a Google Doc",
categories={BlockCategory.DATA},
input_schema=GoogleDocsGetMetadataBlock.Input,
output_schema=GoogleDocsGetMetadataBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("title", "Test Document"),
("document_id", "1abc123def456"),
("revision_id", "rev_123"),
(
"document_url",
"https://docs.google.com/document/d/1abc123def456/edit",
),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_get_metadata": lambda *args, **kwargs: {
"title": "Test Document",
"revision_id": "rev_123",
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._get_metadata,
service,
input_data.document.id,
)
yield "title", result["title"]
yield "document_id", input_data.document.id
yield "revision_id", result["revision_id"]
yield "document_url", f"https://docs.google.com/document/d/{input_data.document.id}/edit"
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to get metadata: {str(e)}"
def _get_metadata(self, service, document_id: str) -> dict:
doc = service.documents().get(documentId=document_id).execute()
return {
"title": doc.get("title", ""),
"revision_id": doc.get("revisionId", ""),
}
class GoogleDocsInsertTableBlock(Block):
"""Insert a table into a Google Doc, optionally with content."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc",
allowed_views=["DOCUMENTS"],
)
rows: int = SchemaField(
default=3,
description="Number of rows (ignored if content provided)",
)
columns: int = SchemaField(
default=3,
description="Number of columns (ignored if content provided)",
)
content: list[list[str]] = SchemaField(
default=[],
description="Optional 2D array of cell content, e.g. [['Header1', 'Header2'], ['Row1Col1', 'Row1Col2']]. If provided, rows/columns are derived from this.",
)
index: int = SchemaField(
default=0,
description="Position to insert table (0 = end of document)",
)
format_as_markdown: bool = SchemaField(
default=False,
description="Format cell content as Markdown (headers, bold, links, etc.)",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of table insertion")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="e104b3ab-dfef-45f9-9702-14e950988f53",
description="Insert a table into a Google Doc, optionally with content and Markdown formatting",
categories={BlockCategory.DATA},
input_schema=GoogleDocsInsertTableBlock.Input,
output_schema=GoogleDocsInsertTableBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"content": [["Header1", "Header2"], ["Row1Col1", "Row1Col2"]],
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"result",
{
"success": True,
"rows": 2,
"columns": 2,
"cells_populated": 4,
"cells_found": 4,
},
),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_insert_table": lambda *args, **kwargs: {
"success": True,
"rows": 2,
"columns": 2,
"cells_populated": 4,
"cells_found": 4,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
# Determine rows/columns from content if provided
content = input_data.content
# Check if content is valid:
# 1. Has at least one row with at least one cell (even if empty string)
# 2. Has at least one non-empty cell value
has_valid_structure = bool(content and any(len(row) > 0 for row in content))
has_content = has_valid_structure and any(
cell for row in content for cell in row
)
if has_content:
# Use content dimensions - filter out empty rows for row count,
# use max column count across all rows
rows = len(content)
columns = max(len(row) for row in content)
else:
# No valid content - use explicit rows/columns, clear content
rows = input_data.rows
columns = input_data.columns
content = [] # Clear so we skip population step
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._insert_table,
service,
input_data.document.id,
rows,
columns,
input_data.index,
content,
input_data.format_as_markdown,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to insert table: {str(e)}"
def _insert_table(
self,
service,
document_id: str,
rows: int,
columns: int,
index: int,
content: list[list[str]],
format_as_markdown: bool,
) -> dict:
# If index is 0, insert at end of document
if index == 0:
index = _get_document_end_index(service, document_id)
# Insert the empty table structure
requests = [
{
"insertTable": {
"rows": rows,
"columns": columns,
"location": {"index": index},
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
# If no content provided, we're done
if not content:
return {"success": True, "rows": rows, "columns": columns}
# Fetch the document to find cell indexes
doc = service.documents().get(documentId=document_id).execute()
body_content = doc.get("body", {}).get("content", [])
# Find all tables and pick the one we just inserted
# (the one with highest startIndex that's >= our insert point, or the last one if inserted at end)
tables_found = []
for element in body_content:
if "table" in element:
tables_found.append(element)
if not tables_found:
return {
"success": True,
"rows": rows,
"columns": columns,
"warning": "Table created but could not find it to populate",
}
# If we inserted at end (index was high), take the last table
# Otherwise, take the first table at or after our insert index
table_element = None
# Heuristic: rows * columns * 2 estimates the minimum index space a table
# occupies (each cell has at least a start index and structural overhead).
# This helps determine if our insert point was near the document end.
estimated_table_size = rows * columns * 2
if (
index
>= _get_document_end_index(service, document_id) - estimated_table_size
):
# Likely inserted at end - use last table
table_element = tables_found[-1]
else:
for tbl in tables_found:
if tbl.get("startIndex", 0) >= index:
table_element = tbl
break
if not table_element:
table_element = tables_found[-1]
# Extract cell start indexes from the table structure
# Structure: table -> tableRows -> tableCells -> content[0] -> startIndex
cell_positions: list[tuple[int, int, int]] = [] # (row, col, start_index)
table_data = table_element.get("table", {})
table_rows_list = table_data.get("tableRows", [])
for row_idx, table_row in enumerate(table_rows_list):
cells = table_row.get("tableCells", [])
for col_idx, cell in enumerate(cells):
cell_content = cell.get("content", [])
if cell_content:
# Get the start index of the first element in the cell
first_element = cell_content[0]
cell_start = first_element.get("startIndex")
if cell_start is not None:
cell_positions.append((row_idx, col_idx, cell_start))
if not cell_positions:
return {
"success": True,
"rows": rows,
"columns": columns,
"warning": f"Table created but could not extract cell positions. Table has {len(table_rows_list)} rows.",
}
# Sort by index descending so we can insert in reverse order
# (inserting later content first preserves earlier indexes)
cell_positions.sort(key=lambda x: x[2], reverse=True)
cells_populated = 0
if format_as_markdown:
# Markdown formatting: process each cell individually since
# gravitas-md2gdocs requests may have complex interdependencies
for row_idx, col_idx, cell_start in cell_positions:
if row_idx < len(content) and col_idx < len(content[row_idx]):
cell_text = content[row_idx][col_idx]
if not cell_text:
continue
md_requests = to_requests(cell_text, start_index=cell_start)
if md_requests:
service.documents().batchUpdate(
documentId=document_id, body={"requests": md_requests}
).execute()
cells_populated += 1
else:
# Plain text: batch all insertions into a single API call
# Cells are sorted by index descending, so earlier requests
# don't affect indices of later ones
all_text_requests = []
for row_idx, col_idx, cell_start in cell_positions:
if row_idx < len(content) and col_idx < len(content[row_idx]):
cell_text = content[row_idx][col_idx]
if not cell_text:
continue
all_text_requests.append(
{
"insertText": {
"location": {"index": cell_start},
"text": cell_text,
}
}
)
cells_populated += 1
if all_text_requests:
service.documents().batchUpdate(
documentId=document_id, body={"requests": all_text_requests}
).execute()
return {
"success": True,
"rows": rows,
"columns": columns,
"cells_populated": cells_populated,
"cells_found": len(cell_positions),
}
class GoogleDocsInsertPageBreakBlock(Block):
"""Insert a page break into a Google Doc."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc",
allowed_views=["DOCUMENTS"],
)
index: int = SchemaField(
default=0,
description="Position to insert page break (0 = end of document)",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of page break insertion")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="f199e674-803b-4ee8-8bbf-172e6512190b",
description="Insert a page break into a Google Doc",
categories={BlockCategory.DATA},
input_schema=GoogleDocsInsertPageBreakBlock.Input,
output_schema=GoogleDocsInsertPageBreakBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_insert_page_break": lambda *args, **kwargs: {"success": True},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._insert_page_break,
service,
input_data.document.id,
input_data.index,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to insert page break: {str(e)}"
def _insert_page_break(self, service, document_id: str, index: int) -> dict:
if index == 0:
index = _get_document_end_index(service, document_id)
requests = [
{
"insertPageBreak": {
"location": {"index": index},
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {"success": True}
class GoogleDocsDeleteContentBlock(Block):
"""Delete a range of content from a Google Doc."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc",
allowed_views=["DOCUMENTS"],
)
start_index: int = SchemaField(
description="Start index of content to delete (must be >= 1, as index 0 is a section break)",
ge=1,
)
end_index: int = SchemaField(description="End index of content to delete")
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of delete operation")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="5f9f9fa4-9071-4028-97c2-9d15fb422dc5",
description="Delete a range of content from a Google Doc",
categories={BlockCategory.DATA},
input_schema=GoogleDocsDeleteContentBlock.Input,
output_schema=GoogleDocsDeleteContentBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"start_index": 10,
"end_index": 50,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True, "characters_deleted": 40}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_delete_content": lambda *args, **kwargs: {
"success": True,
"characters_deleted": 40,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
if input_data.start_index >= input_data.end_index:
yield "error", "Start index must be less than end index"
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._delete_content,
service,
input_data.document.id,
input_data.start_index,
input_data.end_index,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to delete content: {str(e)}"
def _delete_content(
self, service, document_id: str, start_index: int, end_index: int
) -> dict:
requests = [
{
"deleteContentRange": {
"range": {
"startIndex": start_index,
"endIndex": end_index,
}
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {"success": True, "characters_deleted": end_index - start_index}
class ExportFormat(str, Enum):
PDF = "application/pdf"
DOCX = "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
ODT = "application/vnd.oasis.opendocument.text"
TXT = "text/plain"
HTML = "text/html"
EPUB = "application/epub+zip"
RTF = "application/rtf"
class GoogleDocsExportBlock(Block):
"""Export a Google Doc to various formats."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc to export",
allowed_views=["DOCUMENTS"],
)
format: ExportFormat = SchemaField(
default=ExportFormat.PDF,
description="Export format",
)
class Output(BlockSchemaOutput):
content: str = SchemaField(
description="Exported content (base64 encoded for binary formats)"
)
mime_type: str = SchemaField(description="MIME type of exported content")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if export failed")
def __init__(self):
super().__init__(
id="e32d5642-7b51-458c-bd83-75ff96fec299",
description="Export a Google Doc to PDF, Word, text, or other formats",
categories={BlockCategory.DATA},
input_schema=GoogleDocsExportBlock.Input,
output_schema=GoogleDocsExportBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"format": ExportFormat.TXT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("content", "This is the document content as plain text."),
("mime_type", "text/plain"),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_export_document": lambda *args, **kwargs: {
"content": "This is the document content as plain text.",
"mime_type": "text/plain",
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
drive_service = _build_drive_service(credentials)
result = await asyncio.to_thread(
self._export_document,
drive_service,
input_data.document.id,
input_data.format.value,
)
yield "content", result["content"]
yield "mime_type", result["mime_type"]
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to export document: {str(e)}"
def _export_document(self, service, document_id: str, mime_type: str) -> dict:
import base64
response = (
service.files().export(fileId=document_id, mimeType=mime_type).execute()
)
# For text formats, return as string; for binary, base64 encode
if mime_type in ["text/plain", "text/html"]:
content = (
response.decode("utf-8") if isinstance(response, bytes) else response
)
else:
content = base64.b64encode(response).decode("utf-8")
return {"content": content, "mime_type": mime_type}
class GoogleDocsFormatTextBlock(Block):
"""Apply formatting to text in a Google Doc."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc",
allowed_views=["DOCUMENTS"],
)
start_index: int = SchemaField(
description="Start index of text to format (must be >= 1, as index 0 is a section break)",
ge=1,
)
end_index: int = SchemaField(description="End index of text to format")
bold: bool = SchemaField(
default=False,
description="Make text bold",
)
italic: bool = SchemaField(
default=False,
description="Make text italic",
)
underline: bool = SchemaField(
default=False,
description="Underline text",
)
font_size: int = SchemaField(
default=0,
description="Font size in points (0 = no change)",
)
foreground_color: str = SchemaField(
default="",
description="Text color as hex (e.g., #FF0000 for red)",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of format operation")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="04c38a7e-7ee5-4e1a-86c1-9727123577bc",
description="Apply formatting (bold, italic, color, etc.) to text in a Google Doc",
categories={BlockCategory.DATA},
input_schema=GoogleDocsFormatTextBlock.Input,
output_schema=GoogleDocsFormatTextBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"start_index": 2,
"end_index": 10,
"bold": True,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_format_text": lambda *args, **kwargs: {"success": True},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
if input_data.start_index >= input_data.end_index:
yield "error", "Start index must be less than end index"
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._format_text,
service,
input_data.document.id,
input_data.start_index,
input_data.end_index,
input_data.bold,
input_data.italic,
input_data.underline,
input_data.font_size,
input_data.foreground_color,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to format text: {str(e)}"
def _format_text(
self,
service,
document_id: str,
start_index: int,
end_index: int,
bold: bool,
italic: bool,
underline: bool,
font_size: int,
foreground_color: str,
) -> dict:
text_style: dict[str, Any] = {}
fields = []
if bold:
text_style["bold"] = True
fields.append("bold")
if italic:
text_style["italic"] = True
fields.append("italic")
if underline:
text_style["underline"] = True
fields.append("underline")
if font_size > 0:
text_style["fontSize"] = {"magnitude": font_size, "unit": "PT"}
fields.append("fontSize")
if foreground_color:
rgb = _parse_hex_color_to_rgb_floats(foreground_color)
if rgb is None:
if not fields:
return {
"success": False,
"message": (
f"Invalid foreground_color: {foreground_color!r}. "
"Expected hex like #RGB or #RRGGBB."
),
}
# Ignore invalid color, but still apply other formatting.
# This avoids failing the whole operation due to a single bad value.
warning = (
f"Ignored invalid foreground_color: {foreground_color!r}. "
"Expected hex like #RGB or #RRGGBB."
)
else:
r, g, b = rgb
text_style["foregroundColor"] = {
"color": {"rgbColor": {"red": r, "green": g, "blue": b}}
}
fields.append("foregroundColor")
warning = None
else:
warning = None
if not fields:
return {"success": True, "message": "No formatting options specified"}
requests = [
{
"updateTextStyle": {
"range": {"startIndex": start_index, "endIndex": end_index},
"textStyle": text_style,
"fields": ",".join(fields),
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
if warning:
return {"success": True, "warning": warning}
return {"success": True}
class GoogleDocsShareBlock(Block):
"""Share a Google Doc with specific users."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc to share",
allowed_views=["DOCUMENTS"],
)
email: str = SchemaField(
default="",
description="Email address to share with. Leave empty for link sharing.",
)
role: ShareRole = SchemaField(
default=ShareRole.READER,
description="Permission role for the user",
)
send_notification: bool = SchemaField(
default=True,
description="Send notification email to the user",
)
message: str = SchemaField(
default="",
description="Optional message to include in notification email",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of the share operation")
share_link: str = SchemaField(description="Link to the document")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if share failed")
def __init__(self):
super().__init__(
id="4e7ec771-4cc8-4eb7-ae3d-46377ecdb5d2",
description="Share a Google Doc with specific users",
categories={BlockCategory.DATA},
input_schema=GoogleDocsShareBlock.Input,
output_schema=GoogleDocsShareBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"email": "test@example.com",
"role": "reader",
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True}),
("share_link", "https://docs.google.com/document/d/1abc123def456/edit"),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_share_document": lambda *args, **kwargs: {
"success": True,
"share_link": "https://docs.google.com/document/d/1abc123def456/edit",
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
service = _build_drive_service(credentials)
result = await asyncio.to_thread(
self._share_document,
service,
input_data.document.id,
input_data.email,
input_data.role,
input_data.send_notification,
input_data.message,
)
yield "result", {"success": True}
yield "share_link", result["share_link"]
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to share document: {str(e)}"
def _share_document(
self,
service,
document_id: str,
email: str,
role: ShareRole,
send_notification: bool,
message: str,
) -> dict:
share_link = f"https://docs.google.com/document/d/{document_id}/edit"
if email:
# Share with specific user
permission = {"type": "user", "role": role.value, "emailAddress": email}
kwargs: dict[str, Any] = {
"fileId": document_id,
"body": permission,
"sendNotificationEmail": send_notification,
}
if message:
kwargs["emailMessage"] = message
service.permissions().create(**kwargs).execute()
else:
# Create "anyone with the link" permission for link sharing
permission = {"type": "anyone", "role": role.value}
service.permissions().create(
fileId=document_id,
body=permission,
).execute()
return {"success": True, "share_link": share_link}
class GoogleDocsSetPublicAccessBlock(Block):
"""Make a Google Doc publicly accessible or private."""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc",
allowed_views=["DOCUMENTS"],
)
public: bool = SchemaField(
default=True,
description="True to make public, False to make private",
)
role: PublicAccessRole = SchemaField(
default=PublicAccessRole.READER,
description="Permission role for public access",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of the operation")
share_link: str = SchemaField(description="Link to the document")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="d104f6e1-80af-4fe9-b5a1-3cab20081b6c",
description="Make a Google Doc public or private",
categories={BlockCategory.DATA},
input_schema=GoogleDocsSetPublicAccessBlock.Input,
output_schema=GoogleDocsSetPublicAccessBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"public": True,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True, "is_public": True}),
(
"share_link",
"https://docs.google.com/document/d/1abc123def456/edit?usp=sharing",
),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_set_public_access": lambda *args, **kwargs: {
"success": True,
"is_public": True,
"share_link": "https://docs.google.com/document/d/1abc123def456/edit?usp=sharing",
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
service = _build_drive_service(credentials)
result = await asyncio.to_thread(
self._set_public_access,
service,
input_data.document.id,
input_data.public,
input_data.role,
)
yield "result", {"success": True, "is_public": result["is_public"]}
yield "share_link", result["share_link"]
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to set public access: {str(e)}"
def _set_public_access(
self, service, document_id: str, public: bool, role: PublicAccessRole
) -> dict:
share_link = f"https://docs.google.com/document/d/{document_id}/edit"
if public:
permission = {"type": "anyone", "role": role.value}
service.permissions().create(fileId=document_id, body=permission).execute()
share_link += "?usp=sharing"
else:
permissions = service.permissions().list(fileId=document_id).execute()
for perm in permissions.get("permissions", []):
if perm.get("type") == "anyone":
service.permissions().delete(
fileId=document_id, permissionId=perm["id"]
).execute()
return {"success": True, "is_public": public, "share_link": share_link}
# ============ Markdown Blocks ============
class GoogleDocsAppendMarkdownBlock(Block):
"""Append Markdown content to the end of a Google Doc.
Converts Markdown to Google Docs formatting, supporting:
- Headers (H1-H6)
- Bold, italic, strikethrough
- Inline code and code blocks
- Links
- Bulleted and numbered lists
- Blockquotes
Perfect for AI agents that generate Markdown output.
"""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc to append to",
allowed_views=["DOCUMENTS"],
)
markdown: str = SchemaField(
description="Markdown content to append to the document"
)
add_newline: bool = SchemaField(
default=True,
description="Add a newline before the appended content",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of the append operation")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="60854b69-ecbd-4188-bd89-f7966a4d3b38",
description="Append Markdown content to the end of a Google Doc with full formatting - ideal for LLM/AI output",
categories={BlockCategory.DATA},
input_schema=GoogleDocsAppendMarkdownBlock.Input,
output_schema=GoogleDocsAppendMarkdownBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"markdown": "# Hello World\n\nThis is **bold** and *italic* text.\n\n- Item 1\n- Item 2",
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True, "requests_count": 5}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_append_markdown": lambda *args, **kwargs: {
"success": True,
"requests_count": 5,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
if not input_data.markdown:
yield "error", "No markdown content provided"
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._append_markdown,
service,
input_data.document.id,
input_data.markdown,
input_data.add_newline,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to append markdown: {str(e)}"
def _append_markdown(
self,
service,
document_id: str,
markdown: str,
add_newline: bool,
) -> dict:
end_index = _get_document_end_index(service, document_id)
# Optionally add a newline before the content
if add_newline and end_index > 1:
newline_requests = [
{"insertText": {"location": {"index": end_index}, "text": "\n"}}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": newline_requests}
).execute()
end_index += 1
# Convert markdown to Google Docs requests
requests = to_requests(markdown, start_index=end_index)
if requests:
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {"success": True, "requests_count": len(requests)}
class GoogleDocsReplaceAllWithMarkdownBlock(Block):
"""Replace entire Google Doc content with Markdown.
Clears the document and inserts formatted Markdown content.
Supports headers, bold, italic, lists, links, code blocks, etc.
"""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc to replace content in",
allowed_views=["DOCUMENTS"],
)
markdown: str = SchemaField(
description="Markdown content to replace the document with"
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of the replace operation")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="b6cfb2de-5f0b-437c-b29d-45aebbda9c00",
description="Replace entire Google Doc content with formatted Markdown - ideal for LLM/AI output",
categories={BlockCategory.DATA},
input_schema=GoogleDocsReplaceAllWithMarkdownBlock.Input,
output_schema=GoogleDocsReplaceAllWithMarkdownBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"markdown": "# New Document\n\nThis replaces everything.",
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True, "requests_count": 3}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_replace_all_with_markdown": lambda *args, **kwargs: {
"success": True,
"requests_count": 3,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
if not input_data.markdown:
yield "error", "No markdown content provided"
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._replace_all_with_markdown,
service,
input_data.document.id,
input_data.markdown,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to replace document with markdown: {str(e)}"
def _replace_all_with_markdown(
self,
service,
document_id: str,
markdown: str,
) -> dict:
# Delete all existing content
doc_end = _get_document_end_index(service, document_id)
if doc_end > 1:
delete_requests = [
{
"deleteContentRange": {
"range": {"startIndex": 1, "endIndex": doc_end}
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": delete_requests}
).execute()
# Insert markdown at beginning
requests = to_requests(markdown, start_index=1)
if requests:
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {"success": True, "requests_count": len(requests)}
class GoogleDocsInsertMarkdownAtBlock(Block):
"""Insert Markdown content at a specific position in a Google Doc.
Converts Markdown to Google Docs formatting and inserts at the specified index.
"""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc to insert into",
allowed_views=["DOCUMENTS"],
)
markdown: str = SchemaField(description="Markdown content to insert")
index: int = SchemaField(
default=1,
description="Position index to insert at (1 = start of document)",
ge=1,
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of the insert operation")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="76e94b04-e02f-4981-8cb8-47ece1be18b4",
description="Insert formatted Markdown at a specific position in a Google Doc - ideal for LLM/AI output",
categories={BlockCategory.DATA},
input_schema=GoogleDocsInsertMarkdownAtBlock.Input,
output_schema=GoogleDocsInsertMarkdownAtBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"markdown": "## Inserted Section\n\nThis was inserted.",
"index": 1,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("result", {"success": True, "requests_count": 3}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_insert_markdown_at": lambda *args, **kwargs: {
"success": True,
"requests_count": 3,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
if not input_data.markdown:
yield "error", "No markdown content provided"
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._insert_markdown_at,
service,
input_data.document.id,
input_data.markdown,
input_data.index,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to insert markdown: {str(e)}"
def _insert_markdown_at(
self,
service,
document_id: str,
markdown: str,
index: int,
) -> dict:
requests = to_requests(markdown, start_index=index)
if requests:
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {"success": True, "requests_count": len(requests)}
class GoogleDocsReplaceRangeWithMarkdownBlock(Block):
"""Replace a specific range (by index) in a Google Doc with Markdown.
Deletes content between start and end indices, then inserts formatted Markdown.
"""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc",
allowed_views=["DOCUMENTS"],
)
markdown: str = SchemaField(
description="Markdown content to insert in place of the range"
)
start_index: int = SchemaField(
description="Start index of the range to replace (must be >= 1)",
ge=1,
)
end_index: int = SchemaField(
description="End index of the range to replace",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result of the replace operation")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="9e43a905-a918-4da0-8874-dfddd3c46953",
description="Replace a specific index range in a Google Doc with formatted Markdown - ideal for LLM/AI output",
categories={BlockCategory.DATA},
input_schema=GoogleDocsReplaceRangeWithMarkdownBlock.Input,
output_schema=GoogleDocsReplaceRangeWithMarkdownBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"markdown": "**Replaced content**",
"start_index": 10,
"end_index": 50,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"result",
{"success": True, "requests_count": 2, "characters_deleted": 40},
),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_replace_range_with_markdown": lambda *args, **kwargs: {
"success": True,
"requests_count": 2,
"characters_deleted": 40,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
if not input_data.markdown:
yield "error", "No markdown content provided"
return
if input_data.start_index >= input_data.end_index:
yield "error", "Start index must be less than end index"
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._replace_range_with_markdown,
service,
input_data.document.id,
input_data.markdown,
input_data.start_index,
input_data.end_index,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to replace range with markdown: {str(e)}"
def _replace_range_with_markdown(
self,
service,
document_id: str,
markdown: str,
start_index: int,
end_index: int,
) -> dict:
# Delete the range first
delete_requests = [
{
"deleteContentRange": {
"range": {"startIndex": start_index, "endIndex": end_index}
}
}
]
service.documents().batchUpdate(
documentId=document_id, body={"requests": delete_requests}
).execute()
# Insert markdown at the start of the deleted range
requests = to_requests(markdown, start_index=start_index)
if requests:
service.documents().batchUpdate(
documentId=document_id, body={"requests": requests}
).execute()
return {
"success": True,
"requests_count": len(requests),
"characters_deleted": end_index - start_index,
}
class GoogleDocsReplaceContentWithMarkdownBlock(Block):
"""Find text in a Google Doc and replace it with formatted Markdown.
Perfect for template workflows - use placeholders like {{INTRO}} or {{SUMMARY}}
and replace them with formatted Markdown content.
"""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc",
allowed_views=["DOCUMENTS"],
)
find_text: str = SchemaField(
description="Text to find and replace (e.g., '{{PLACEHOLDER}}' or any text)"
)
markdown: str = SchemaField(
description="Markdown content to replace the found text with"
)
match_case: bool = SchemaField(
default=False,
description="Match case when finding text",
)
class Output(BlockSchemaOutput):
result: dict = SchemaField(description="Result with replacement count")
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="2cc58467-90a9-4ef8-a7a7-700784f93b76",
description="Find text and replace it with formatted Markdown - ideal for LLM/AI output and templates",
categories={BlockCategory.DATA},
input_schema=GoogleDocsReplaceContentWithMarkdownBlock.Input,
output_schema=GoogleDocsReplaceContentWithMarkdownBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"find_text": "{{PLACEHOLDER}}",
"markdown": "# Replaced Header\n\nThis is the **replacement** content.",
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"result",
{"success": True, "replacements_made": 1, "requests_count": 4},
),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_replace_content_with_markdown": lambda *args, **kwargs: {
"success": True,
"replacements_made": 1,
"requests_count": 4,
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
if not input_data.find_text:
yield "error", "No find text provided"
return
if not input_data.markdown:
yield "error", "No markdown content provided"
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._replace_content_with_markdown,
service,
input_data.document.id,
input_data.find_text,
input_data.markdown,
input_data.match_case,
)
yield "result", result
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to replace content with markdown: {str(e)}"
def _find_text_positions(
self, service, document_id: str, find_text: str, match_case: bool
) -> list[tuple[int, int]]:
"""Find all positions of the search text using actual document indices.
Iterates through document content and uses the real startIndex/endIndex
from text runs, rather than trying to map plain text offsets to indices.
"""
doc = service.documents().get(documentId=document_id).execute()
body = doc.get("body", {})
content = body.get("content", [])
positions = []
search_text = find_text if match_case else find_text.lower()
def search_in_content(elements: list[dict]) -> None:
"""Recursively search through content elements."""
for element in elements:
if "paragraph" in element:
for text_elem in element["paragraph"].get("elements", []):
if "textRun" in text_elem:
text_run = text_elem["textRun"]
text_content = text_run.get("content", "")
start_index = text_elem.get("startIndex", 0)
# Search within this text run
text_to_search = (
text_content if match_case else text_content.lower()
)
offset = 0
while True:
pos = text_to_search.find(search_text, offset)
if pos == -1:
break
# Calculate actual document indices
doc_start = start_index + pos
doc_end = doc_start + len(find_text)
positions.append((doc_start, doc_end))
offset = pos + 1
elif "table" in element:
# Search within table cells
for row in element["table"].get("tableRows", []):
for cell in row.get("tableCells", []):
search_in_content(cell.get("content", []))
search_in_content(content)
return positions
def _replace_content_with_markdown(
self,
service,
document_id: str,
find_text: str,
markdown: str,
match_case: bool,
) -> dict:
# Find all positions of the text
positions = self._find_text_positions(
service, document_id, find_text, match_case
)
if not positions:
return {"success": True, "replacements_made": 0, "requests_count": 0}
total_requests = 0
replacements_made = 0
# Process in reverse order to maintain correct indices
for start_index, end_index in reversed(positions):
# Build combined request: delete first, then insert markdown
# Combining into single batchUpdate reduces API calls by half
combined_requests = [
{
"deleteContentRange": {
"range": {"startIndex": start_index, "endIndex": end_index}
}
}
]
# Get markdown insert requests
md_requests = to_requests(markdown, start_index=start_index)
if md_requests:
combined_requests.extend(md_requests)
# Execute delete + insert in single API call
service.documents().batchUpdate(
documentId=document_id, body={"requests": combined_requests}
).execute()
total_requests += len(combined_requests)
replacements_made += 1
return {
"success": True,
"replacements_made": replacements_made,
"requests_count": total_requests,
}
class GoogleDocsGetStructureBlock(Block):
"""Get document structure with index positions for precise editing operations.
Returns content segments with their start/end indexes, making it easy to
target specific parts of the document for insertion, deletion, or formatting.
"""
class Input(BlockSchemaInput):
document: GoogleDriveFile = GoogleDriveFileField(
title="Document",
description="Select a Google Doc to analyze",
allowed_views=["DOCUMENTS"],
)
detailed: bool = SchemaField(
default=False,
description="Return full hierarchical structure instead of flat segments",
)
class Output(BlockSchemaOutput):
segments: list[dict] = SchemaField(
description="Flat list of content segments with indexes (when detailed=False)"
)
structure: dict = SchemaField(
description="Full hierarchical document structure (when detailed=True)"
)
document: GoogleDriveFile = SchemaField(description="The document for chaining")
error: str = SchemaField(description="Error message if operation failed")
def __init__(self):
super().__init__(
id="e0561cc1-2154-4abf-bd06-79509348a18e",
description="Get document structure with index positions for precise editing operations",
categories={BlockCategory.DATA},
input_schema=GoogleDocsGetStructureBlock.Input,
output_schema=GoogleDocsGetStructureBlock.Output,
disabled=GOOGLE_DOCS_DISABLED,
test_input={
"document": {
"id": "1abc123def456",
"name": "Test Document",
"mimeType": "application/vnd.google-apps.document",
},
"detailed": False,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"segments",
[
{
"type": "paragraph",
"text": "Hello World",
"start_index": 1,
"end_index": 12,
},
{
"type": "paragraph",
"text": "Second paragraph",
"start_index": 13,
"end_index": 29,
},
],
),
("structure", {}),
(
"document",
GoogleDriveFile(
id="1abc123def456",
name="Test Document",
mimeType="application/vnd.google-apps.document",
url="https://docs.google.com/document/d/1abc123def456/edit",
iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png",
isFolder=False,
_credentials_id=None,
),
),
],
test_mock={
"_get_structure": lambda *args, **kwargs: {
"segments": [
{
"type": "paragraph",
"text": "Hello World",
"start_index": 1,
"end_index": 12,
},
{
"type": "paragraph",
"text": "Second paragraph",
"start_index": 13,
"end_index": 29,
},
],
"structure": {},
},
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
if not input_data.document:
yield "error", "No document selected"
return
validation_error = _validate_document_file(input_data.document)
if validation_error:
yield "error", validation_error
return
try:
service = _build_docs_service(credentials)
result = await asyncio.to_thread(
self._get_structure,
service,
input_data.document.id,
input_data.detailed,
)
yield "segments", result["segments"]
yield "structure", result["structure"]
yield "document", _make_document_output(input_data.document)
except Exception as e:
yield "error", f"Failed to get document structure: {str(e)}"
def _extract_paragraph_text(self, paragraph: dict) -> str:
"""Extract plain text from a paragraph element."""
text_parts = []
for elem in paragraph.get("elements", []):
if "textRun" in elem:
text_parts.append(elem["textRun"].get("content", ""))
return "".join(text_parts).rstrip("\n")
def _get_paragraph_style(self, paragraph: dict) -> dict:
"""Get paragraph style information."""
style = paragraph.get("paragraphStyle", {})
named_style = style.get("namedStyleType", "NORMAL_TEXT")
# Map named styles to heading levels
heading_map = {
"HEADING_1": 1,
"HEADING_2": 2,
"HEADING_3": 3,
"HEADING_4": 4,
"HEADING_5": 5,
"HEADING_6": 6,
}
if named_style in heading_map:
return {"heading_level": heading_map[named_style]}
return {}
def _process_table_detailed(self, table_element: dict) -> dict:
"""Process table for detailed hierarchical output."""
table = table_element.get("table", {})
table_rows = table.get("tableRows", [])
rows_data = []
for table_row in table_rows:
cells_data = []
for cell in table_row.get("tableCells", []):
cell_content = cell.get("content", [])
cell_text = ""
cell_start = None
cell_end = None
for content_elem in cell_content:
if "paragraph" in content_elem:
cell_text += self._extract_paragraph_text(
content_elem["paragraph"]
)
if cell_start is None:
cell_start = content_elem.get("startIndex")
cell_end = content_elem.get("endIndex")
cells_data.append(
{
"text": cell_text,
"start_index": cell_start,
"end_index": cell_end,
}
)
rows_data.append({"cells": cells_data})
return {
"type": "table",
"start_index": table_element.get("startIndex"),
"end_index": table_element.get("endIndex"),
"rows": rows_data,
"row_count": len(table_rows),
"column_count": table.get("columns", 0),
}
def _get_structure(self, service, document_id: str, detailed: bool) -> dict:
doc = service.documents().get(documentId=document_id).execute()
body = doc.get("body", {})
content = body.get("content", [])
segments: list[dict] = []
structure_body: list[dict] = []
for element in content:
start_index = element.get("startIndex")
end_index = element.get("endIndex")
if "paragraph" in element:
paragraph = element["paragraph"]
text = self._extract_paragraph_text(paragraph)
style_info = self._get_paragraph_style(paragraph)
# Determine segment type
if style_info.get("heading_level"):
seg_type = "heading"
segment = {
"type": seg_type,
"level": style_info["heading_level"],
"text": text,
"start_index": start_index,
"end_index": end_index,
}
else:
seg_type = "paragraph"
segment = {
"type": seg_type,
"text": text,
"start_index": start_index,
"end_index": end_index,
}
# Skip empty paragraphs (just newlines)
if text.strip():
segments.append(segment)
if detailed:
detailed_seg = segment.copy()
detailed_seg["style"] = paragraph.get("paragraphStyle", {})
structure_body.append(detailed_seg)
elif "table" in element:
table = element.get("table", {})
table_rows = table.get("tableRows", [])
segment = {
"type": "table",
"rows": len(table_rows),
"columns": table.get("columns", 0),
"start_index": start_index,
"end_index": end_index,
}
segments.append(segment)
if detailed:
structure_body.append(self._process_table_detailed(element))
elif "sectionBreak" in element:
# Skip section breaks in simple mode, include in detailed
if detailed:
structure_body.append(
{
"type": "section_break",
"start_index": start_index,
"end_index": end_index,
}
)
elif "tableOfContents" in element:
segment = {
"type": "table_of_contents",
"start_index": start_index,
"end_index": end_index,
}
segments.append(segment)
if detailed:
structure_body.append(segment)
result = {
"segments": segments,
"structure": {"body": structure_body} if detailed else {},
}
return result