diff --git a/autogpt_platform/backend/backend/blocks/google/docs.py b/autogpt_platform/backend/backend/blocks/google/docs.py new file mode 100644 index 0000000000..7840cbae73 --- /dev/null +++ b/autogpt_platform/backend/backend/blocks/google/docs.py @@ -0,0 +1,2896 @@ +import asyncio +import re +from enum import Enum +from typing import Any + +from google.oauth2.credentials import Credentials +from googleapiclient.discovery import build +from gravitas_md2gdocs import to_requests + +from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField +from backend.data.block import ( + Block, + BlockCategory, + BlockOutput, + BlockSchemaInput, + BlockSchemaOutput, +) +from backend.data.model import SchemaField +from backend.util.settings import Settings + +from ._auth import ( + GOOGLE_OAUTH_IS_CONFIGURED, + TEST_CREDENTIALS, + TEST_CREDENTIALS_INPUT, + GoogleCredentials, + GoogleCredentialsField, + GoogleCredentialsInput, +) + +settings = Settings() +GOOGLE_DOCS_DISABLED = not GOOGLE_OAUTH_IS_CONFIGURED + + +# ============ Enums ============ + + +class PublicAccessRole(str, Enum): + READER = "reader" + COMMENTER = "commenter" + + +class ShareRole(str, Enum): + READER = "reader" + WRITER = "writer" + COMMENTER = "commenter" + + +# ============ Helper Functions ============ + + +def _build_docs_service(credentials: GoogleCredentials): + """Build Google Docs API service.""" + creds = Credentials( + token=( + credentials.access_token.get_secret_value() + if credentials.access_token + else None + ), + refresh_token=( + credentials.refresh_token.get_secret_value() + if credentials.refresh_token + else None + ), + token_uri="https://oauth2.googleapis.com/token", + client_id=settings.secrets.google_client_id, + client_secret=settings.secrets.google_client_secret, + scopes=credentials.scopes, + ) + return build("docs", "v1", credentials=creds, cache_discovery=False) + + +def _build_drive_service(credentials: GoogleCredentials): + """Build Google Drive API service for file operations.""" + creds = Credentials( + token=( + credentials.access_token.get_secret_value() + if credentials.access_token + else None + ), + refresh_token=( + credentials.refresh_token.get_secret_value() + if credentials.refresh_token + else None + ), + token_uri="https://oauth2.googleapis.com/token", + client_id=settings.secrets.google_client_id, + client_secret=settings.secrets.google_client_secret, + scopes=credentials.scopes, + ) + return build("drive", "v3", credentials=creds, cache_discovery=False) + + +def _validate_document_file(file: GoogleDriveFile) -> str | None: + """Validate that a file is a Google Doc.""" + if not file.id: + return "No document ID provided" + if file.mime_type and file.mime_type != "application/vnd.google-apps.document": + return f"File is not a Google Doc (type: {file.mime_type})" + return None + + +def _parse_hex_color_to_rgb_floats(value: str) -> tuple[float, float, float] | None: + """ + Parse a CSS-like hex color string into normalized RGB floats. + + Supports: + - #RGB / RGB (shorthand) + - #RRGGBB / RRGGBB + + Returns None for malformed inputs. + """ + if not value: + return None + + raw = value.strip() + if raw.startswith("#"): + raw = raw[1:] + + if not re.fullmatch(r"[0-9a-fA-F]{3}([0-9a-fA-F]{3})?", raw): + return None + + if len(raw) == 3: + raw = "".join(ch * 2 for ch in raw) + + r = int(raw[0:2], 16) / 255.0 + g = int(raw[2:4], 16) / 255.0 + b = int(raw[4:6], 16) / 255.0 + return (r, g, b) + + +def _get_document_end_index(service, document_id: str) -> int: + """Get the index at the end of the document body.""" + doc = service.documents().get(documentId=document_id).execute() + body = doc.get("body", {}) + content = body.get("content", []) + if content: + last_element = content[-1] + return last_element.get("endIndex", 1) - 1 + return 1 + + +def _extract_text_from_content(content: list[dict]) -> str: + """Extract plain text from document content structure.""" + text_parts = [] + for element in content: + if "paragraph" in element: + for elem in element["paragraph"].get("elements", []): + if "textRun" in elem: + text_parts.append(elem["textRun"].get("content", "")) + elif "table" in element: + for row in element["table"].get("tableRows", []): + for cell in row.get("tableCells", []): + cell_content = cell.get("content", []) + text_parts.append(_extract_text_from_content(cell_content)) + return "".join(text_parts) + + +# ============ Document Output Helper ============ + + +def _make_document_output(file: GoogleDriveFile) -> GoogleDriveFile: + """Create standardized document output for chaining.""" + return GoogleDriveFile( + id=file.id, + name=file.name, + mimeType="application/vnd.google-apps.document", + url=f"https://docs.google.com/document/d/{file.id}/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=file.credentials_id, + ) + + +# ============ Blocks ============ + + +class GoogleDocsReadBlock(Block): + """Read content from a Google Doc.""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc to read", + allowed_views=["DOCUMENTS"], + ) + + class Output(BlockSchemaOutput): + text: str = SchemaField(description="Plain text content of the document") + title: str = SchemaField(description="Document title") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if read failed") + + def __init__(self): + super().__init__( + id="420a2b3c-5db2-4bda-82bc-a68a862a3d55", + description="Read text content from a Google Doc", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsReadBlock.Input, + output_schema=GoogleDocsReadBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("text", "Hello World\nThis is a test document.\n"), + ("title", "Test Document"), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_read_document": lambda *args, **kwargs: { + "text": "Hello World\nThis is a test document.\n", + "title": "Test Document", + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._read_document, + service, + input_data.document.id, + ) + yield "text", result["text"] + yield "title", result["title"] + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to read document: {str(e)}" + + def _read_document(self, service, document_id: str) -> dict: + doc = service.documents().get(documentId=document_id).execute() + title = doc.get("title", "") + body = doc.get("body", {}) + content = body.get("content", []) + text = _extract_text_from_content(content) + return {"text": text, "title": title} + + +class GoogleDocsCreateBlock(Block): + """Create a new Google Doc.""" + + class Input(BlockSchemaInput): + credentials: GoogleCredentialsInput = GoogleCredentialsField( + ["https://www.googleapis.com/auth/drive.file"] + ) + title: str = SchemaField(description="Title for the new document") + initial_content: str = SchemaField( + default="", + description="Optional initial text content", + ) + + class Output(BlockSchemaOutput): + document: GoogleDriveFile = SchemaField(description="The created document") + document_id: str = SchemaField(description="ID of the created document") + document_url: str = SchemaField(description="URL to open the document") + error: str = SchemaField(description="Error message if creation failed") + + def __init__(self): + super().__init__( + id="d430d941-cf81-4f84-8b19-2e3f670b2fca", + description="Create a new Google Doc", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsCreateBlock.Input, + output_schema=GoogleDocsCreateBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "credentials": TEST_CREDENTIALS_INPUT, + "title": "My New Document", + "initial_content": "Hello, this is the initial content.", + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ( + "document", + GoogleDriveFile( + id="new_doc_123", + name="My New Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/new_doc_123/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=TEST_CREDENTIALS_INPUT["id"], + ), + ), + ("document_id", "new_doc_123"), + ("document_url", "https://docs.google.com/document/d/new_doc_123/edit"), + ], + test_mock={ + "_create_document": lambda *args, **kwargs: { + "document_id": "new_doc_123", + "document_url": "https://docs.google.com/document/d/new_doc_123/edit", + "title": "My New Document", + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.title: + yield "error", "Document title is required" + return + + try: + drive_service = _build_drive_service(credentials) + docs_service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._create_document, + drive_service, + docs_service, + input_data.title, + input_data.initial_content, + ) + doc_id = result["document_id"] + doc_url = result["document_url"] + yield "document", GoogleDriveFile( + id=doc_id, + name=input_data.title, + mimeType="application/vnd.google-apps.document", + url=doc_url, + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=input_data.credentials.id, + ) + yield "document_id", doc_id + yield "document_url", doc_url + except Exception as e: + yield "error", f"Failed to create document: {str(e)}" + + def _create_document( + self, drive_service, docs_service, title: str, initial_content: str + ) -> dict: + # Create the document + file_metadata = { + "name": title, + "mimeType": "application/vnd.google-apps.document", + } + result = drive_service.files().create(body=file_metadata).execute() + document_id = result.get("id") + document_url = f"https://docs.google.com/document/d/{document_id}/edit" + + # Add initial content if provided + if initial_content: + requests = [ + { + "insertText": { + "location": {"index": 1}, + "text": initial_content, + } + } + ] + docs_service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + return { + "document_id": document_id, + "document_url": document_url, + "title": title, + } + + +class GoogleDocsAppendPlainTextBlock(Block): + """Append plain text to the end of a Google Doc (no formatting).""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc to append to", + allowed_views=["DOCUMENTS"], + ) + text: str = SchemaField( + description="Plain text to append (no formatting applied)" + ) + add_newline: bool = SchemaField( + default=True, + description="Add a newline before the appended text", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of the append operation") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if append failed") + + def __init__(self): + super().__init__( + id="ddc29d9f-78dc-4682-8787-c8a76f00cf38", + description="Append plain text to the end of a Google Doc (no formatting applied)", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsAppendPlainTextBlock.Input, + output_schema=GoogleDocsAppendPlainTextBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "text": "This is appended text.", + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True, "characters_added": 23}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_append_text": lambda *args, **kwargs: { + "success": True, + "characters_added": 23, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._append_text, + service, + input_data.document.id, + input_data.text, + input_data.add_newline, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to append text: {str(e)}" + + def _append_text( + self, service, document_id: str, text: str, add_newline: bool + ) -> dict: + end_index = _get_document_end_index(service, document_id) + text_to_insert = ("\n" if add_newline else "") + text + + requests = [ + { + "insertText": { + "location": {"index": end_index}, + "text": text_to_insert, + } + } + ] + + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + return {"success": True, "characters_added": len(text_to_insert)} + + +class GoogleDocsInsertPlainTextBlock(Block): + """Insert plain text at a specific position in a Google Doc (no formatting).""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc to insert into", + allowed_views=["DOCUMENTS"], + ) + text: str = SchemaField( + description="Plain text to insert (no formatting applied)" + ) + index: int = SchemaField( + default=1, + description="Position index to insert at (1 = start of document)", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of the insert operation") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if insert failed") + + def __init__(self): + super().__init__( + id="0443fdbc-ebb0-49a4-a3ea-6ace9c14da22", + description="Insert plain text at a specific position in a Google Doc (no formatting applied)", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsInsertPlainTextBlock.Input, + output_schema=GoogleDocsInsertPlainTextBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "text": "Inserted text here. ", + "index": 1, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True, "characters_inserted": 20}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_insert_text": lambda *args, **kwargs: { + "success": True, + "characters_inserted": 20, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._insert_text, + service, + input_data.document.id, + input_data.text, + input_data.index, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to insert text: {str(e)}" + + def _insert_text(self, service, document_id: str, text: str, index: int) -> dict: + requests = [ + { + "insertText": { + "location": {"index": max(1, index)}, + "text": text, + } + } + ] + + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + return {"success": True, "characters_inserted": len(text)} + + +class GoogleDocsFindReplacePlainTextBlock(Block): + """Find and replace plain text in a Google Doc (no formatting applied to replacement).""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc", + allowed_views=["DOCUMENTS"], + ) + find_text: str = SchemaField(description="Plain text to find") + replace_text: str = SchemaField( + description="Plain text to replace with (no formatting applied)" + ) + match_case: bool = SchemaField( + default=False, + description="Match case when finding text", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result with replacement count") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="e5046ee2-b094-418e-a25e-c0f90c91721c", + description="Find and replace plain text in a Google Doc (no formatting applied to replacement)", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsFindReplacePlainTextBlock.Input, + output_schema=GoogleDocsFindReplacePlainTextBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "find_text": "old text", + "replace_text": "new text", + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True, "replacements_made": 3}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_find_replace": lambda *args, **kwargs: { + "success": True, + "replacements_made": 3, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._find_replace, + service, + input_data.document.id, + input_data.find_text, + input_data.replace_text, + input_data.match_case, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to find/replace: {str(e)}" + + def _find_replace( + self, + service, + document_id: str, + find_text: str, + replace_text: str, + match_case: bool, + ) -> dict: + requests = [ + { + "replaceAllText": { + "containsText": { + "text": find_text, + "matchCase": match_case, + }, + "replaceText": replace_text, + } + } + ] + + response = ( + service.documents() + .batchUpdate(documentId=document_id, body={"requests": requests}) + .execute() + ) + + # Get replacement count from response + replies = response.get("replies", []) + replacements = 0 + if replies and "replaceAllText" in replies[0]: + replacements = replies[0]["replaceAllText"].get("occurrencesChanged", 0) + + return {"success": True, "replacements_made": replacements} + + +class GoogleDocsGetMetadataBlock(Block): + """Get metadata about a Google Doc.""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc", + allowed_views=["DOCUMENTS"], + ) + + class Output(BlockSchemaOutput): + title: str = SchemaField(description="Document title") + document_id: str = SchemaField(description="Document ID") + revision_id: str = SchemaField(description="Current revision ID") + document_url: str = SchemaField(description="URL to open the document") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="100bc806-acbf-4dc5-a3a2-998026b96516", + description="Get metadata about a Google Doc", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsGetMetadataBlock.Input, + output_schema=GoogleDocsGetMetadataBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("title", "Test Document"), + ("document_id", "1abc123def456"), + ("revision_id", "rev_123"), + ( + "document_url", + "https://docs.google.com/document/d/1abc123def456/edit", + ), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_get_metadata": lambda *args, **kwargs: { + "title": "Test Document", + "revision_id": "rev_123", + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._get_metadata, + service, + input_data.document.id, + ) + yield "title", result["title"] + yield "document_id", input_data.document.id + yield "revision_id", result["revision_id"] + yield "document_url", f"https://docs.google.com/document/d/{input_data.document.id}/edit" + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to get metadata: {str(e)}" + + def _get_metadata(self, service, document_id: str) -> dict: + doc = service.documents().get(documentId=document_id).execute() + return { + "title": doc.get("title", ""), + "revision_id": doc.get("revisionId", ""), + } + + +class GoogleDocsInsertTableBlock(Block): + """Insert a table into a Google Doc, optionally with content.""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc", + allowed_views=["DOCUMENTS"], + ) + rows: int = SchemaField( + default=3, + description="Number of rows (ignored if content provided)", + ) + columns: int = SchemaField( + default=3, + description="Number of columns (ignored if content provided)", + ) + content: list[list[str]] = SchemaField( + default=[], + description="Optional 2D array of cell content, e.g. [['Header1', 'Header2'], ['Row1Col1', 'Row1Col2']]. If provided, rows/columns are derived from this.", + ) + index: int = SchemaField( + default=0, + description="Position to insert table (0 = end of document)", + ) + format_as_markdown: bool = SchemaField( + default=False, + description="Format cell content as Markdown (headers, bold, links, etc.)", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of table insertion") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="e104b3ab-dfef-45f9-9702-14e950988f53", + description="Insert a table into a Google Doc, optionally with content and Markdown formatting", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsInsertTableBlock.Input, + output_schema=GoogleDocsInsertTableBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "content": [["Header1", "Header2"], ["Row1Col1", "Row1Col2"]], + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ( + "result", + { + "success": True, + "rows": 2, + "columns": 2, + "cells_populated": 4, + "cells_found": 4, + }, + ), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_insert_table": lambda *args, **kwargs: { + "success": True, + "rows": 2, + "columns": 2, + "cells_populated": 4, + "cells_found": 4, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + # Determine rows/columns from content if provided + content = input_data.content + + # Check if content is valid: + # 1. Has at least one row with at least one cell (even if empty string) + # 2. Has at least one non-empty cell value + has_valid_structure = bool(content and any(len(row) > 0 for row in content)) + has_content = has_valid_structure and any( + cell for row in content for cell in row + ) + + if has_content: + # Use content dimensions - filter out empty rows for row count, + # use max column count across all rows + rows = len(content) + columns = max(len(row) for row in content) + else: + # No valid content - use explicit rows/columns, clear content + rows = input_data.rows + columns = input_data.columns + content = [] # Clear so we skip population step + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._insert_table, + service, + input_data.document.id, + rows, + columns, + input_data.index, + content, + input_data.format_as_markdown, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to insert table: {str(e)}" + + def _insert_table( + self, + service, + document_id: str, + rows: int, + columns: int, + index: int, + content: list[list[str]], + format_as_markdown: bool, + ) -> dict: + # If index is 0, insert at end of document + if index == 0: + index = _get_document_end_index(service, document_id) + + # Insert the empty table structure + requests = [ + { + "insertTable": { + "rows": rows, + "columns": columns, + "location": {"index": index}, + } + } + ] + + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + # If no content provided, we're done + if not content: + return {"success": True, "rows": rows, "columns": columns} + + # Fetch the document to find cell indexes + doc = service.documents().get(documentId=document_id).execute() + body_content = doc.get("body", {}).get("content", []) + + # Find all tables and pick the one we just inserted + # (the one with highest startIndex that's >= our insert point, or the last one if inserted at end) + tables_found = [] + for element in body_content: + if "table" in element: + tables_found.append(element) + + if not tables_found: + return { + "success": True, + "rows": rows, + "columns": columns, + "warning": "Table created but could not find it to populate", + } + + # If we inserted at end (index was high), take the last table + # Otherwise, take the first table at or after our insert index + table_element = None + # Heuristic: rows * columns * 2 estimates the minimum index space a table + # occupies (each cell has at least a start index and structural overhead). + # This helps determine if our insert point was near the document end. + estimated_table_size = rows * columns * 2 + if ( + index + >= _get_document_end_index(service, document_id) - estimated_table_size + ): + # Likely inserted at end - use last table + table_element = tables_found[-1] + else: + for tbl in tables_found: + if tbl.get("startIndex", 0) >= index: + table_element = tbl + break + if not table_element: + table_element = tables_found[-1] + + # Extract cell start indexes from the table structure + # Structure: table -> tableRows -> tableCells -> content[0] -> startIndex + cell_positions: list[tuple[int, int, int]] = [] # (row, col, start_index) + table_data = table_element.get("table", {}) + table_rows_list = table_data.get("tableRows", []) + + for row_idx, table_row in enumerate(table_rows_list): + cells = table_row.get("tableCells", []) + for col_idx, cell in enumerate(cells): + cell_content = cell.get("content", []) + if cell_content: + # Get the start index of the first element in the cell + first_element = cell_content[0] + cell_start = first_element.get("startIndex") + if cell_start is not None: + cell_positions.append((row_idx, col_idx, cell_start)) + + if not cell_positions: + return { + "success": True, + "rows": rows, + "columns": columns, + "warning": f"Table created but could not extract cell positions. Table has {len(table_rows_list)} rows.", + } + + # Sort by index descending so we can insert in reverse order + # (inserting later content first preserves earlier indexes) + cell_positions.sort(key=lambda x: x[2], reverse=True) + + cells_populated = 0 + + if format_as_markdown: + # Markdown formatting: process each cell individually since + # gravitas-md2gdocs requests may have complex interdependencies + for row_idx, col_idx, cell_start in cell_positions: + if row_idx < len(content) and col_idx < len(content[row_idx]): + cell_text = content[row_idx][col_idx] + if not cell_text: + continue + md_requests = to_requests(cell_text, start_index=cell_start) + if md_requests: + service.documents().batchUpdate( + documentId=document_id, body={"requests": md_requests} + ).execute() + cells_populated += 1 + else: + # Plain text: batch all insertions into a single API call + # Cells are sorted by index descending, so earlier requests + # don't affect indices of later ones + all_text_requests = [] + for row_idx, col_idx, cell_start in cell_positions: + if row_idx < len(content) and col_idx < len(content[row_idx]): + cell_text = content[row_idx][col_idx] + if not cell_text: + continue + all_text_requests.append( + { + "insertText": { + "location": {"index": cell_start}, + "text": cell_text, + } + } + ) + cells_populated += 1 + + if all_text_requests: + service.documents().batchUpdate( + documentId=document_id, body={"requests": all_text_requests} + ).execute() + + return { + "success": True, + "rows": rows, + "columns": columns, + "cells_populated": cells_populated, + "cells_found": len(cell_positions), + } + + +class GoogleDocsInsertPageBreakBlock(Block): + """Insert a page break into a Google Doc.""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc", + allowed_views=["DOCUMENTS"], + ) + index: int = SchemaField( + default=0, + description="Position to insert page break (0 = end of document)", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of page break insertion") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="f199e674-803b-4ee8-8bbf-172e6512190b", + description="Insert a page break into a Google Doc", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsInsertPageBreakBlock.Input, + output_schema=GoogleDocsInsertPageBreakBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_insert_page_break": lambda *args, **kwargs: {"success": True}, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._insert_page_break, + service, + input_data.document.id, + input_data.index, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to insert page break: {str(e)}" + + def _insert_page_break(self, service, document_id: str, index: int) -> dict: + if index == 0: + index = _get_document_end_index(service, document_id) + + requests = [ + { + "insertPageBreak": { + "location": {"index": index}, + } + } + ] + + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + return {"success": True} + + +class GoogleDocsDeleteContentBlock(Block): + """Delete a range of content from a Google Doc.""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc", + allowed_views=["DOCUMENTS"], + ) + start_index: int = SchemaField( + description="Start index of content to delete (must be >= 1, as index 0 is a section break)", + ge=1, + ) + end_index: int = SchemaField(description="End index of content to delete") + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of delete operation") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="5f9f9fa4-9071-4028-97c2-9d15fb422dc5", + description="Delete a range of content from a Google Doc", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsDeleteContentBlock.Input, + output_schema=GoogleDocsDeleteContentBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "start_index": 10, + "end_index": 50, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True, "characters_deleted": 40}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_delete_content": lambda *args, **kwargs: { + "success": True, + "characters_deleted": 40, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + if input_data.start_index >= input_data.end_index: + yield "error", "Start index must be less than end index" + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._delete_content, + service, + input_data.document.id, + input_data.start_index, + input_data.end_index, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to delete content: {str(e)}" + + def _delete_content( + self, service, document_id: str, start_index: int, end_index: int + ) -> dict: + requests = [ + { + "deleteContentRange": { + "range": { + "startIndex": start_index, + "endIndex": end_index, + } + } + } + ] + + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + return {"success": True, "characters_deleted": end_index - start_index} + + +class ExportFormat(str, Enum): + PDF = "application/pdf" + DOCX = "application/vnd.openxmlformats-officedocument.wordprocessingml.document" + ODT = "application/vnd.oasis.opendocument.text" + TXT = "text/plain" + HTML = "text/html" + EPUB = "application/epub+zip" + RTF = "application/rtf" + + +class GoogleDocsExportBlock(Block): + """Export a Google Doc to various formats.""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc to export", + allowed_views=["DOCUMENTS"], + ) + format: ExportFormat = SchemaField( + default=ExportFormat.PDF, + description="Export format", + ) + + class Output(BlockSchemaOutput): + content: str = SchemaField( + description="Exported content (base64 encoded for binary formats)" + ) + mime_type: str = SchemaField(description="MIME type of exported content") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if export failed") + + def __init__(self): + super().__init__( + id="e32d5642-7b51-458c-bd83-75ff96fec299", + description="Export a Google Doc to PDF, Word, text, or other formats", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsExportBlock.Input, + output_schema=GoogleDocsExportBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "format": ExportFormat.TXT, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("content", "This is the document content as plain text."), + ("mime_type", "text/plain"), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_export_document": lambda *args, **kwargs: { + "content": "This is the document content as plain text.", + "mime_type": "text/plain", + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + drive_service = _build_drive_service(credentials) + result = await asyncio.to_thread( + self._export_document, + drive_service, + input_data.document.id, + input_data.format.value, + ) + yield "content", result["content"] + yield "mime_type", result["mime_type"] + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to export document: {str(e)}" + + def _export_document(self, service, document_id: str, mime_type: str) -> dict: + import base64 + + response = ( + service.files().export(fileId=document_id, mimeType=mime_type).execute() + ) + + # For text formats, return as string; for binary, base64 encode + if mime_type in ["text/plain", "text/html"]: + content = ( + response.decode("utf-8") if isinstance(response, bytes) else response + ) + else: + content = base64.b64encode(response).decode("utf-8") + + return {"content": content, "mime_type": mime_type} + + +class GoogleDocsFormatTextBlock(Block): + """Apply formatting to text in a Google Doc.""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc", + allowed_views=["DOCUMENTS"], + ) + start_index: int = SchemaField( + description="Start index of text to format (must be >= 1, as index 0 is a section break)", + ge=1, + ) + end_index: int = SchemaField(description="End index of text to format") + bold: bool = SchemaField( + default=False, + description="Make text bold", + ) + italic: bool = SchemaField( + default=False, + description="Make text italic", + ) + underline: bool = SchemaField( + default=False, + description="Underline text", + ) + font_size: int = SchemaField( + default=0, + description="Font size in points (0 = no change)", + ) + foreground_color: str = SchemaField( + default="", + description="Text color as hex (e.g., #FF0000 for red)", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of format operation") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="04c38a7e-7ee5-4e1a-86c1-9727123577bc", + description="Apply formatting (bold, italic, color, etc.) to text in a Google Doc", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsFormatTextBlock.Input, + output_schema=GoogleDocsFormatTextBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "start_index": 2, + "end_index": 10, + "bold": True, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_format_text": lambda *args, **kwargs: {"success": True}, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + if input_data.start_index >= input_data.end_index: + yield "error", "Start index must be less than end index" + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._format_text, + service, + input_data.document.id, + input_data.start_index, + input_data.end_index, + input_data.bold, + input_data.italic, + input_data.underline, + input_data.font_size, + input_data.foreground_color, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to format text: {str(e)}" + + def _format_text( + self, + service, + document_id: str, + start_index: int, + end_index: int, + bold: bool, + italic: bool, + underline: bool, + font_size: int, + foreground_color: str, + ) -> dict: + text_style: dict[str, Any] = {} + fields = [] + + if bold: + text_style["bold"] = True + fields.append("bold") + if italic: + text_style["italic"] = True + fields.append("italic") + if underline: + text_style["underline"] = True + fields.append("underline") + if font_size > 0: + text_style["fontSize"] = {"magnitude": font_size, "unit": "PT"} + fields.append("fontSize") + if foreground_color: + rgb = _parse_hex_color_to_rgb_floats(foreground_color) + if rgb is None: + if not fields: + return { + "success": False, + "message": ( + f"Invalid foreground_color: {foreground_color!r}. " + "Expected hex like #RGB or #RRGGBB." + ), + } + # Ignore invalid color, but still apply other formatting. + # This avoids failing the whole operation due to a single bad value. + warning = ( + f"Ignored invalid foreground_color: {foreground_color!r}. " + "Expected hex like #RGB or #RRGGBB." + ) + else: + r, g, b = rgb + text_style["foregroundColor"] = { + "color": {"rgbColor": {"red": r, "green": g, "blue": b}} + } + fields.append("foregroundColor") + warning = None + else: + warning = None + + if not fields: + return {"success": True, "message": "No formatting options specified"} + + requests = [ + { + "updateTextStyle": { + "range": {"startIndex": start_index, "endIndex": end_index}, + "textStyle": text_style, + "fields": ",".join(fields), + } + } + ] + + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + if warning: + return {"success": True, "warning": warning} + return {"success": True} + + +class GoogleDocsShareBlock(Block): + """Share a Google Doc with specific users.""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc to share", + allowed_views=["DOCUMENTS"], + ) + email: str = SchemaField( + default="", + description="Email address to share with. Leave empty for link sharing.", + ) + role: ShareRole = SchemaField( + default=ShareRole.READER, + description="Permission role for the user", + ) + send_notification: bool = SchemaField( + default=True, + description="Send notification email to the user", + ) + message: str = SchemaField( + default="", + description="Optional message to include in notification email", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of the share operation") + share_link: str = SchemaField(description="Link to the document") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if share failed") + + def __init__(self): + super().__init__( + id="4e7ec771-4cc8-4eb7-ae3d-46377ecdb5d2", + description="Share a Google Doc with specific users", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsShareBlock.Input, + output_schema=GoogleDocsShareBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "email": "test@example.com", + "role": "reader", + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True}), + ("share_link", "https://docs.google.com/document/d/1abc123def456/edit"), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_share_document": lambda *args, **kwargs: { + "success": True, + "share_link": "https://docs.google.com/document/d/1abc123def456/edit", + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + service = _build_drive_service(credentials) + result = await asyncio.to_thread( + self._share_document, + service, + input_data.document.id, + input_data.email, + input_data.role, + input_data.send_notification, + input_data.message, + ) + yield "result", {"success": True} + yield "share_link", result["share_link"] + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to share document: {str(e)}" + + def _share_document( + self, + service, + document_id: str, + email: str, + role: ShareRole, + send_notification: bool, + message: str, + ) -> dict: + share_link = f"https://docs.google.com/document/d/{document_id}/edit" + + if email: + # Share with specific user + permission = {"type": "user", "role": role.value, "emailAddress": email} + + kwargs: dict[str, Any] = { + "fileId": document_id, + "body": permission, + "sendNotificationEmail": send_notification, + } + if message: + kwargs["emailMessage"] = message + + service.permissions().create(**kwargs).execute() + else: + # Create "anyone with the link" permission for link sharing + permission = {"type": "anyone", "role": role.value} + service.permissions().create( + fileId=document_id, + body=permission, + ).execute() + + return {"success": True, "share_link": share_link} + + +class GoogleDocsSetPublicAccessBlock(Block): + """Make a Google Doc publicly accessible or private.""" + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc", + allowed_views=["DOCUMENTS"], + ) + public: bool = SchemaField( + default=True, + description="True to make public, False to make private", + ) + role: PublicAccessRole = SchemaField( + default=PublicAccessRole.READER, + description="Permission role for public access", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of the operation") + share_link: str = SchemaField(description="Link to the document") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="d104f6e1-80af-4fe9-b5a1-3cab20081b6c", + description="Make a Google Doc public or private", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsSetPublicAccessBlock.Input, + output_schema=GoogleDocsSetPublicAccessBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "public": True, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True, "is_public": True}), + ( + "share_link", + "https://docs.google.com/document/d/1abc123def456/edit?usp=sharing", + ), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_set_public_access": lambda *args, **kwargs: { + "success": True, + "is_public": True, + "share_link": "https://docs.google.com/document/d/1abc123def456/edit?usp=sharing", + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + service = _build_drive_service(credentials) + result = await asyncio.to_thread( + self._set_public_access, + service, + input_data.document.id, + input_data.public, + input_data.role, + ) + yield "result", {"success": True, "is_public": result["is_public"]} + yield "share_link", result["share_link"] + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to set public access: {str(e)}" + + def _set_public_access( + self, service, document_id: str, public: bool, role: PublicAccessRole + ) -> dict: + share_link = f"https://docs.google.com/document/d/{document_id}/edit" + + if public: + permission = {"type": "anyone", "role": role.value} + service.permissions().create(fileId=document_id, body=permission).execute() + share_link += "?usp=sharing" + else: + permissions = service.permissions().list(fileId=document_id).execute() + for perm in permissions.get("permissions", []): + if perm.get("type") == "anyone": + service.permissions().delete( + fileId=document_id, permissionId=perm["id"] + ).execute() + + return {"success": True, "is_public": public, "share_link": share_link} + + +# ============ Markdown Blocks ============ + + +class GoogleDocsAppendMarkdownBlock(Block): + """Append Markdown content to the end of a Google Doc. + + Converts Markdown to Google Docs formatting, supporting: + - Headers (H1-H6) + - Bold, italic, strikethrough + - Inline code and code blocks + - Links + - Bulleted and numbered lists + - Blockquotes + + Perfect for AI agents that generate Markdown output. + """ + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc to append to", + allowed_views=["DOCUMENTS"], + ) + markdown: str = SchemaField( + description="Markdown content to append to the document" + ) + add_newline: bool = SchemaField( + default=True, + description="Add a newline before the appended content", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of the append operation") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="60854b69-ecbd-4188-bd89-f7966a4d3b38", + description="Append Markdown content to the end of a Google Doc with full formatting - ideal for LLM/AI output", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsAppendMarkdownBlock.Input, + output_schema=GoogleDocsAppendMarkdownBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "markdown": "# Hello World\n\nThis is **bold** and *italic* text.\n\n- Item 1\n- Item 2", + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True, "requests_count": 5}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_append_markdown": lambda *args, **kwargs: { + "success": True, + "requests_count": 5, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + if not input_data.markdown: + yield "error", "No markdown content provided" + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._append_markdown, + service, + input_data.document.id, + input_data.markdown, + input_data.add_newline, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to append markdown: {str(e)}" + + def _append_markdown( + self, + service, + document_id: str, + markdown: str, + add_newline: bool, + ) -> dict: + end_index = _get_document_end_index(service, document_id) + + # Optionally add a newline before the content + if add_newline and end_index > 1: + newline_requests = [ + {"insertText": {"location": {"index": end_index}, "text": "\n"}} + ] + service.documents().batchUpdate( + documentId=document_id, body={"requests": newline_requests} + ).execute() + end_index += 1 + + # Convert markdown to Google Docs requests + requests = to_requests(markdown, start_index=end_index) + + if requests: + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + return {"success": True, "requests_count": len(requests)} + + +class GoogleDocsReplaceAllWithMarkdownBlock(Block): + """Replace entire Google Doc content with Markdown. + + Clears the document and inserts formatted Markdown content. + Supports headers, bold, italic, lists, links, code blocks, etc. + """ + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc to replace content in", + allowed_views=["DOCUMENTS"], + ) + markdown: str = SchemaField( + description="Markdown content to replace the document with" + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of the replace operation") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="b6cfb2de-5f0b-437c-b29d-45aebbda9c00", + description="Replace entire Google Doc content with formatted Markdown - ideal for LLM/AI output", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsReplaceAllWithMarkdownBlock.Input, + output_schema=GoogleDocsReplaceAllWithMarkdownBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "markdown": "# New Document\n\nThis replaces everything.", + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True, "requests_count": 3}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_replace_all_with_markdown": lambda *args, **kwargs: { + "success": True, + "requests_count": 3, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + if not input_data.markdown: + yield "error", "No markdown content provided" + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._replace_all_with_markdown, + service, + input_data.document.id, + input_data.markdown, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to replace document with markdown: {str(e)}" + + def _replace_all_with_markdown( + self, + service, + document_id: str, + markdown: str, + ) -> dict: + # Delete all existing content + doc_end = _get_document_end_index(service, document_id) + if doc_end > 1: + delete_requests = [ + { + "deleteContentRange": { + "range": {"startIndex": 1, "endIndex": doc_end} + } + } + ] + service.documents().batchUpdate( + documentId=document_id, body={"requests": delete_requests} + ).execute() + + # Insert markdown at beginning + requests = to_requests(markdown, start_index=1) + + if requests: + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + return {"success": True, "requests_count": len(requests)} + + +class GoogleDocsInsertMarkdownAtBlock(Block): + """Insert Markdown content at a specific position in a Google Doc. + + Converts Markdown to Google Docs formatting and inserts at the specified index. + """ + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc to insert into", + allowed_views=["DOCUMENTS"], + ) + markdown: str = SchemaField(description="Markdown content to insert") + index: int = SchemaField( + default=1, + description="Position index to insert at (1 = start of document)", + ge=1, + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of the insert operation") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="76e94b04-e02f-4981-8cb8-47ece1be18b4", + description="Insert formatted Markdown at a specific position in a Google Doc - ideal for LLM/AI output", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsInsertMarkdownAtBlock.Input, + output_schema=GoogleDocsInsertMarkdownAtBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "markdown": "## Inserted Section\n\nThis was inserted.", + "index": 1, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ("result", {"success": True, "requests_count": 3}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_insert_markdown_at": lambda *args, **kwargs: { + "success": True, + "requests_count": 3, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + if not input_data.markdown: + yield "error", "No markdown content provided" + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._insert_markdown_at, + service, + input_data.document.id, + input_data.markdown, + input_data.index, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to insert markdown: {str(e)}" + + def _insert_markdown_at( + self, + service, + document_id: str, + markdown: str, + index: int, + ) -> dict: + requests = to_requests(markdown, start_index=index) + + if requests: + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + return {"success": True, "requests_count": len(requests)} + + +class GoogleDocsReplaceRangeWithMarkdownBlock(Block): + """Replace a specific range (by index) in a Google Doc with Markdown. + + Deletes content between start and end indices, then inserts formatted Markdown. + """ + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc", + allowed_views=["DOCUMENTS"], + ) + markdown: str = SchemaField( + description="Markdown content to insert in place of the range" + ) + start_index: int = SchemaField( + description="Start index of the range to replace (must be >= 1)", + ge=1, + ) + end_index: int = SchemaField( + description="End index of the range to replace", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result of the replace operation") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="9e43a905-a918-4da0-8874-dfddd3c46953", + description="Replace a specific index range in a Google Doc with formatted Markdown - ideal for LLM/AI output", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsReplaceRangeWithMarkdownBlock.Input, + output_schema=GoogleDocsReplaceRangeWithMarkdownBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "markdown": "**Replaced content**", + "start_index": 10, + "end_index": 50, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ( + "result", + {"success": True, "requests_count": 2, "characters_deleted": 40}, + ), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_replace_range_with_markdown": lambda *args, **kwargs: { + "success": True, + "requests_count": 2, + "characters_deleted": 40, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + if not input_data.markdown: + yield "error", "No markdown content provided" + return + + if input_data.start_index >= input_data.end_index: + yield "error", "Start index must be less than end index" + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._replace_range_with_markdown, + service, + input_data.document.id, + input_data.markdown, + input_data.start_index, + input_data.end_index, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to replace range with markdown: {str(e)}" + + def _replace_range_with_markdown( + self, + service, + document_id: str, + markdown: str, + start_index: int, + end_index: int, + ) -> dict: + # Delete the range first + delete_requests = [ + { + "deleteContentRange": { + "range": {"startIndex": start_index, "endIndex": end_index} + } + } + ] + service.documents().batchUpdate( + documentId=document_id, body={"requests": delete_requests} + ).execute() + + # Insert markdown at the start of the deleted range + requests = to_requests(markdown, start_index=start_index) + + if requests: + service.documents().batchUpdate( + documentId=document_id, body={"requests": requests} + ).execute() + + return { + "success": True, + "requests_count": len(requests), + "characters_deleted": end_index - start_index, + } + + +class GoogleDocsReplaceContentWithMarkdownBlock(Block): + """Find text in a Google Doc and replace it with formatted Markdown. + + Perfect for template workflows - use placeholders like {{INTRO}} or {{SUMMARY}} + and replace them with formatted Markdown content. + """ + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc", + allowed_views=["DOCUMENTS"], + ) + find_text: str = SchemaField( + description="Text to find and replace (e.g., '{{PLACEHOLDER}}' or any text)" + ) + markdown: str = SchemaField( + description="Markdown content to replace the found text with" + ) + match_case: bool = SchemaField( + default=False, + description="Match case when finding text", + ) + + class Output(BlockSchemaOutput): + result: dict = SchemaField(description="Result with replacement count") + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="2cc58467-90a9-4ef8-a7a7-700784f93b76", + description="Find text and replace it with formatted Markdown - ideal for LLM/AI output and templates", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsReplaceContentWithMarkdownBlock.Input, + output_schema=GoogleDocsReplaceContentWithMarkdownBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "find_text": "{{PLACEHOLDER}}", + "markdown": "# Replaced Header\n\nThis is the **replacement** content.", + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ( + "result", + {"success": True, "replacements_made": 1, "requests_count": 4}, + ), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_replace_content_with_markdown": lambda *args, **kwargs: { + "success": True, + "replacements_made": 1, + "requests_count": 4, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + if not input_data.find_text: + yield "error", "No find text provided" + return + + if not input_data.markdown: + yield "error", "No markdown content provided" + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._replace_content_with_markdown, + service, + input_data.document.id, + input_data.find_text, + input_data.markdown, + input_data.match_case, + ) + yield "result", result + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to replace content with markdown: {str(e)}" + + def _find_text_positions( + self, service, document_id: str, find_text: str, match_case: bool + ) -> list[tuple[int, int]]: + """Find all positions of the search text using actual document indices. + + Iterates through document content and uses the real startIndex/endIndex + from text runs, rather than trying to map plain text offsets to indices. + """ + doc = service.documents().get(documentId=document_id).execute() + body = doc.get("body", {}) + content = body.get("content", []) + + positions = [] + search_text = find_text if match_case else find_text.lower() + + def search_in_content(elements: list[dict]) -> None: + """Recursively search through content elements.""" + for element in elements: + if "paragraph" in element: + for text_elem in element["paragraph"].get("elements", []): + if "textRun" in text_elem: + text_run = text_elem["textRun"] + text_content = text_run.get("content", "") + start_index = text_elem.get("startIndex", 0) + + # Search within this text run + text_to_search = ( + text_content if match_case else text_content.lower() + ) + offset = 0 + while True: + pos = text_to_search.find(search_text, offset) + if pos == -1: + break + # Calculate actual document indices + doc_start = start_index + pos + doc_end = doc_start + len(find_text) + positions.append((doc_start, doc_end)) + offset = pos + 1 + + elif "table" in element: + # Search within table cells + for row in element["table"].get("tableRows", []): + for cell in row.get("tableCells", []): + search_in_content(cell.get("content", [])) + + search_in_content(content) + return positions + + def _replace_content_with_markdown( + self, + service, + document_id: str, + find_text: str, + markdown: str, + match_case: bool, + ) -> dict: + # Find all positions of the text + positions = self._find_text_positions( + service, document_id, find_text, match_case + ) + + if not positions: + return {"success": True, "replacements_made": 0, "requests_count": 0} + + total_requests = 0 + replacements_made = 0 + + # Process in reverse order to maintain correct indices + for start_index, end_index in reversed(positions): + # Build combined request: delete first, then insert markdown + # Combining into single batchUpdate reduces API calls by half + combined_requests = [ + { + "deleteContentRange": { + "range": {"startIndex": start_index, "endIndex": end_index} + } + } + ] + + # Get markdown insert requests + md_requests = to_requests(markdown, start_index=start_index) + if md_requests: + combined_requests.extend(md_requests) + + # Execute delete + insert in single API call + service.documents().batchUpdate( + documentId=document_id, body={"requests": combined_requests} + ).execute() + + total_requests += len(combined_requests) + replacements_made += 1 + + return { + "success": True, + "replacements_made": replacements_made, + "requests_count": total_requests, + } + + +class GoogleDocsGetStructureBlock(Block): + """Get document structure with index positions for precise editing operations. + + Returns content segments with their start/end indexes, making it easy to + target specific parts of the document for insertion, deletion, or formatting. + """ + + class Input(BlockSchemaInput): + document: GoogleDriveFile = GoogleDriveFileField( + title="Document", + description="Select a Google Doc to analyze", + allowed_views=["DOCUMENTS"], + ) + detailed: bool = SchemaField( + default=False, + description="Return full hierarchical structure instead of flat segments", + ) + + class Output(BlockSchemaOutput): + segments: list[dict] = SchemaField( + description="Flat list of content segments with indexes (when detailed=False)" + ) + structure: dict = SchemaField( + description="Full hierarchical document structure (when detailed=True)" + ) + document: GoogleDriveFile = SchemaField(description="The document for chaining") + error: str = SchemaField(description="Error message if operation failed") + + def __init__(self): + super().__init__( + id="e0561cc1-2154-4abf-bd06-79509348a18e", + description="Get document structure with index positions for precise editing operations", + categories={BlockCategory.DATA}, + input_schema=GoogleDocsGetStructureBlock.Input, + output_schema=GoogleDocsGetStructureBlock.Output, + disabled=GOOGLE_DOCS_DISABLED, + test_input={ + "document": { + "id": "1abc123def456", + "name": "Test Document", + "mimeType": "application/vnd.google-apps.document", + }, + "detailed": False, + }, + test_credentials=TEST_CREDENTIALS, + test_output=[ + ( + "segments", + [ + { + "type": "paragraph", + "text": "Hello World", + "start_index": 1, + "end_index": 12, + }, + { + "type": "paragraph", + "text": "Second paragraph", + "start_index": 13, + "end_index": 29, + }, + ], + ), + ("structure", {}), + ( + "document", + GoogleDriveFile( + id="1abc123def456", + name="Test Document", + mimeType="application/vnd.google-apps.document", + url="https://docs.google.com/document/d/1abc123def456/edit", + iconUrl="https://www.gstatic.com/images/branding/product/1x/docs_48dp.png", + isFolder=False, + _credentials_id=None, + ), + ), + ], + test_mock={ + "_get_structure": lambda *args, **kwargs: { + "segments": [ + { + "type": "paragraph", + "text": "Hello World", + "start_index": 1, + "end_index": 12, + }, + { + "type": "paragraph", + "text": "Second paragraph", + "start_index": 13, + "end_index": 29, + }, + ], + "structure": {}, + }, + }, + ) + + async def run( + self, input_data: Input, *, credentials: GoogleCredentials, **kwargs + ) -> BlockOutput: + if not input_data.document: + yield "error", "No document selected" + return + + validation_error = _validate_document_file(input_data.document) + if validation_error: + yield "error", validation_error + return + + try: + service = _build_docs_service(credentials) + result = await asyncio.to_thread( + self._get_structure, + service, + input_data.document.id, + input_data.detailed, + ) + yield "segments", result["segments"] + yield "structure", result["structure"] + yield "document", _make_document_output(input_data.document) + except Exception as e: + yield "error", f"Failed to get document structure: {str(e)}" + + def _extract_paragraph_text(self, paragraph: dict) -> str: + """Extract plain text from a paragraph element.""" + text_parts = [] + for elem in paragraph.get("elements", []): + if "textRun" in elem: + text_parts.append(elem["textRun"].get("content", "")) + return "".join(text_parts).rstrip("\n") + + def _get_paragraph_style(self, paragraph: dict) -> dict: + """Get paragraph style information.""" + style = paragraph.get("paragraphStyle", {}) + named_style = style.get("namedStyleType", "NORMAL_TEXT") + + # Map named styles to heading levels + heading_map = { + "HEADING_1": 1, + "HEADING_2": 2, + "HEADING_3": 3, + "HEADING_4": 4, + "HEADING_5": 5, + "HEADING_6": 6, + } + + if named_style in heading_map: + return {"heading_level": heading_map[named_style]} + return {} + + def _process_table_detailed(self, table_element: dict) -> dict: + """Process table for detailed hierarchical output.""" + table = table_element.get("table", {}) + table_rows = table.get("tableRows", []) + + rows_data = [] + for table_row in table_rows: + cells_data = [] + for cell in table_row.get("tableCells", []): + cell_content = cell.get("content", []) + cell_text = "" + cell_start = None + cell_end = None + + for content_elem in cell_content: + if "paragraph" in content_elem: + cell_text += self._extract_paragraph_text( + content_elem["paragraph"] + ) + if cell_start is None: + cell_start = content_elem.get("startIndex") + cell_end = content_elem.get("endIndex") + + cells_data.append( + { + "text": cell_text, + "start_index": cell_start, + "end_index": cell_end, + } + ) + rows_data.append({"cells": cells_data}) + + return { + "type": "table", + "start_index": table_element.get("startIndex"), + "end_index": table_element.get("endIndex"), + "rows": rows_data, + "row_count": len(table_rows), + "column_count": table.get("columns", 0), + } + + def _get_structure(self, service, document_id: str, detailed: bool) -> dict: + doc = service.documents().get(documentId=document_id).execute() + body = doc.get("body", {}) + content = body.get("content", []) + + segments: list[dict] = [] + structure_body: list[dict] = [] + + for element in content: + start_index = element.get("startIndex") + end_index = element.get("endIndex") + + if "paragraph" in element: + paragraph = element["paragraph"] + text = self._extract_paragraph_text(paragraph) + style_info = self._get_paragraph_style(paragraph) + + # Determine segment type + if style_info.get("heading_level"): + seg_type = "heading" + segment = { + "type": seg_type, + "level": style_info["heading_level"], + "text": text, + "start_index": start_index, + "end_index": end_index, + } + else: + seg_type = "paragraph" + segment = { + "type": seg_type, + "text": text, + "start_index": start_index, + "end_index": end_index, + } + + # Skip empty paragraphs (just newlines) + if text.strip(): + segments.append(segment) + + if detailed: + detailed_seg = segment.copy() + detailed_seg["style"] = paragraph.get("paragraphStyle", {}) + structure_body.append(detailed_seg) + + elif "table" in element: + table = element.get("table", {}) + table_rows = table.get("tableRows", []) + + segment = { + "type": "table", + "rows": len(table_rows), + "columns": table.get("columns", 0), + "start_index": start_index, + "end_index": end_index, + } + segments.append(segment) + + if detailed: + structure_body.append(self._process_table_detailed(element)) + + elif "sectionBreak" in element: + # Skip section breaks in simple mode, include in detailed + if detailed: + structure_body.append( + { + "type": "section_break", + "start_index": start_index, + "end_index": end_index, + } + ) + + elif "tableOfContents" in element: + segment = { + "type": "table_of_contents", + "start_index": start_index, + "end_index": end_index, + } + segments.append(segment) + + if detailed: + structure_body.append(segment) + + result = { + "segments": segments, + "structure": {"body": structure_body} if detailed else {}, + } + + return result diff --git a/autogpt_platform/backend/poetry.lock b/autogpt_platform/backend/poetry.lock index 9aa55a6c22..9a2983fd07 100644 --- a/autogpt_platform/backend/poetry.lock +++ b/autogpt_platform/backend/poetry.lock @@ -1906,6 +1906,22 @@ httpx = {version = ">=0.26,<0.29", extras = ["http2"]} pydantic = ">=1.10,<3" pyjwt = ">=2.10.1,<3.0.0" +[[package]] +name = "gravitas-md2gdocs" +version = "0.1.0" +description = "Convert Markdown to Google Docs API requests" +optional = false +python-versions = ">=3.10" +groups = ["main"] +files = [ + {file = "gravitas_md2gdocs-0.1.0-py3-none-any.whl", hash = "sha256:0cb0627779fdd65c1604818af4142eea1b25d055060183363de1bae4d9e46508"}, + {file = "gravitas_md2gdocs-0.1.0.tar.gz", hash = "sha256:bb3122fe9fa35c528f3f00b785d3f1398d350082d5d03f60f56c895bdcc68033"}, +] + +[package.extras] +dev = ["google-auth-oauthlib (>=1.0.0)", "pytest (>=7.0.0)", "pytest-cov (>=4.0.0)", "python-dotenv (>=1.0.0)", "ruff (>=0.1.0)"] +google = ["google-api-python-client (>=2.0.0)", "google-auth (>=2.0.0)"] + [[package]] name = "gravitasml" version = "0.1.3" @@ -7279,4 +7295,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.14" -content-hash = "13b191b2a1989d3321ff713c66ff6f5f4f3b82d15df4d407e0e5dbf87d7522c4" +content-hash = "b762806d5d58fcf811220890c4705a16dc62b33387af43e3a29399c62a641098" diff --git a/autogpt_platform/backend/pyproject.toml b/autogpt_platform/backend/pyproject.toml index fb06b65162..1f13bd6897 100644 --- a/autogpt_platform/backend/pyproject.toml +++ b/autogpt_platform/backend/pyproject.toml @@ -82,6 +82,7 @@ firecrawl-py = "^4.3.6" exa-py = "^1.14.20" croniter = "^6.0.0" stagehand = "^0.5.1" +gravitas-md2gdocs = "^0.1.0" [tool.poetry.group.dev.dependencies] aiohappyeyeballs = "^2.6.1" diff --git a/autogpt_platform/backend/test/blocks/test_google_docs_format_text_color.py b/autogpt_platform/backend/test/blocks/test_google_docs_format_text_color.py new file mode 100644 index 0000000000..6a6e8e07c8 --- /dev/null +++ b/autogpt_platform/backend/test/blocks/test_google_docs_format_text_color.py @@ -0,0 +1,113 @@ +from unittest.mock import Mock + +from backend.blocks.google.docs import GoogleDocsFormatTextBlock + + +def _make_mock_docs_service() -> Mock: + service = Mock() + # Ensure chained call exists: service.documents().batchUpdate(...).execute() + service.documents.return_value.batchUpdate.return_value.execute.return_value = {} + return service + + +def test_format_text_parses_shorthand_hex_color(): + block = GoogleDocsFormatTextBlock() + service = _make_mock_docs_service() + + result = block._format_text( + service, + document_id="doc_1", + start_index=1, + end_index=2, + bold=False, + italic=False, + underline=False, + font_size=0, + foreground_color="#FFF", + ) + + assert result["success"] is True + + # Verify request body contains correct rgbColor for white. + _, kwargs = service.documents.return_value.batchUpdate.call_args + requests = kwargs["body"]["requests"] + rgb = requests[0]["updateTextStyle"]["textStyle"]["foregroundColor"]["color"][ + "rgbColor" + ] + assert rgb == {"red": 1.0, "green": 1.0, "blue": 1.0} + + +def test_format_text_parses_full_hex_color(): + block = GoogleDocsFormatTextBlock() + service = _make_mock_docs_service() + + result = block._format_text( + service, + document_id="doc_1", + start_index=1, + end_index=2, + bold=False, + italic=False, + underline=False, + font_size=0, + foreground_color="#FF0000", + ) + + assert result["success"] is True + + _, kwargs = service.documents.return_value.batchUpdate.call_args + requests = kwargs["body"]["requests"] + rgb = requests[0]["updateTextStyle"]["textStyle"]["foregroundColor"]["color"][ + "rgbColor" + ] + assert rgb == {"red": 1.0, "green": 0.0, "blue": 0.0} + + +def test_format_text_ignores_invalid_color_when_other_fields_present(): + block = GoogleDocsFormatTextBlock() + service = _make_mock_docs_service() + + result = block._format_text( + service, + document_id="doc_1", + start_index=1, + end_index=2, + bold=True, + italic=False, + underline=False, + font_size=0, + foreground_color="#GGG", + ) + + assert result["success"] is True + assert "warning" in result + + # Should still apply bold, but should NOT include foregroundColor in textStyle. + _, kwargs = service.documents.return_value.batchUpdate.call_args + requests = kwargs["body"]["requests"] + text_style = requests[0]["updateTextStyle"]["textStyle"] + fields = requests[0]["updateTextStyle"]["fields"] + + assert text_style == {"bold": True} + assert fields == "bold" + + +def test_format_text_invalid_color_only_does_not_call_api(): + block = GoogleDocsFormatTextBlock() + service = _make_mock_docs_service() + + result = block._format_text( + service, + document_id="doc_1", + start_index=1, + end_index=2, + bold=False, + italic=False, + underline=False, + font_size=0, + foreground_color="#F", + ) + + assert result["success"] is False + assert "Invalid foreground_color" in result["message"] + service.documents.return_value.batchUpdate.assert_not_called()