feat(blocks/gmail): add Gmail thread blocks (#9965)

This PR adds two new Gmail integration blocks—**Gmail Get Thread** and
**Gmail Reply**—to the platform, enhancing threaded email workflows. Key
changes include:

- **GmailGetThreadBlock**:  
- New block that retrieves an entire Gmail thread by `threadId`, with an
option to include or exclude messages from Spam and Trash.
- Supports use cases like fetching all messages in a conversation to
check for responses.

- **GmailReplyBlock**:  
- New block that sends a reply within an existing Gmail thread,
maintaining the thread context.
- Accepts detailed input fields including recipients, CC, BCC, subject,
body, and attachments.
- Ensures replies are properly associated with their parent message and
thread.

- **Enhancements to existing Gmail blocks**:  
- The `Email` model and related outputs now include a `threadId` field.
  - Updated test data and mock data to support threaded operations.
  - Expanded OAuth scopes for actions requiring thread metadata.

- **Documentation updates**:  
- Added documentation for the new Gmail blocks in both the general block
listing and the detailed Gmail block docs.
  - Clarified that the `Email` output now includes the `threadId`.

These updates enable more advanced and context-aware Gmail automations,
such as fetching full conversations and replying inline, supporting
richer communication workflows with Gmail.

## Checklist 📋

### For code changes

- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Try all the gmail blocks
  - [x] Send an email reply based on a thread from the get thread block

---------

Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
This commit is contained in:
Toran Bruce Richards
2025-07-17 21:11:31 +01:00
committed by GitHub
parent 725440ff38
commit e50366726c
3 changed files with 426 additions and 14 deletions

View File

@@ -1,6 +1,7 @@
import asyncio
import base64
from email.utils import parseaddr
from email.utils import getaddresses, parseaddr
from pathlib import Path
from typing import List
from google.oauth2.credentials import Credentials
@@ -9,6 +10,7 @@ from pydantic import BaseModel
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
from backend.util.settings import Settings
from ._auth import (
@@ -29,6 +31,7 @@ class Attachment(BaseModel):
class Email(BaseModel):
threadId: str
id: str
subject: str
snippet: str
@@ -40,6 +43,12 @@ class Email(BaseModel):
attachments: List[Attachment]
class Thread(BaseModel):
id: str
messages: list[Email]
historyId: str
class GmailReadBlock(Block):
class Input(BlockSchema):
credentials: GoogleCredentialsInput = GoogleCredentialsField(
@@ -83,6 +92,7 @@ class GmailReadBlock(Block):
(
"email",
{
"threadId": "t1",
"id": "1",
"subject": "Test Email",
"snippet": "This is a test email",
@@ -98,6 +108,7 @@ class GmailReadBlock(Block):
"emails",
[
{
"threadId": "t1",
"id": "1",
"subject": "Test Email",
"snippet": "This is a test email",
@@ -114,6 +125,7 @@ class GmailReadBlock(Block):
test_mock={
"_read_emails": lambda *args, **kwargs: [
{
"threadId": "t1",
"id": "1",
"subject": "Test Email",
"snippet": "This is a test email",
@@ -134,7 +146,11 @@ class GmailReadBlock(Block):
) -> BlockOutput:
service = GmailReadBlock._build_service(credentials, **kwargs)
messages = await asyncio.to_thread(
self._read_emails, service, input_data.query, input_data.max_results
self._read_emails,
service,
input_data.query,
input_data.max_results,
credentials.scopes,
)
for email in messages:
yield "email", email
@@ -161,22 +177,31 @@ class GmailReadBlock(Block):
return build("gmail", "v1", credentials=creds)
def _read_emails(
self, service, query: str | None, max_results: int | None
self,
service,
query: str | None,
max_results: int | None,
scopes: list[str] | None,
) -> list[Email]:
results = (
service.users()
.messages()
.list(userId="me", q=query or "", maxResults=max_results or 10)
.execute()
)
scopes = [s.lower() for s in (scopes or [])]
list_kwargs = {"userId": "me", "maxResults": max_results or 10}
if query and "https://www.googleapis.com/auth/gmail.metadata" not in scopes:
list_kwargs["q"] = query
results = service.users().messages().list(**list_kwargs).execute()
messages = results.get("messages", [])
email_data = []
for message in messages:
format_type = (
"metadata"
if "https://www.googleapis.com/auth/gmail.metadata" in scopes
else "full"
)
msg = (
service.users()
.messages()
.get(userId="me", id=message["id"], format="full")
.get(userId="me", id=message["id"], format=format_type)
.execute()
)
@@ -188,6 +213,7 @@ class GmailReadBlock(Block):
attachments = self._get_attachments(service, msg)
email = Email(
threadId=msg["threadId"],
id=msg["id"],
subject=headers.get("subject", "No Subject"),
snippet=msg["snippet"],
@@ -374,7 +400,6 @@ class GmailSendBlock(Block):
return {"id": sent_message["id"], "status": "sent"}
def _create_message(self, to: str, subject: str, body: str) -> dict:
import base64
from email.mime.text import MIMEText
message = MIMEText(body)
@@ -599,3 +624,329 @@ class GmailRemoveLabelBlock(Block):
if label["name"] == label_name:
return label["id"]
return None
class GmailGetThreadBlock(Block):
class Input(BlockSchema):
credentials: GoogleCredentialsInput = GoogleCredentialsField(
["https://www.googleapis.com/auth/gmail.readonly"]
)
threadId: str = SchemaField(description="Gmail thread ID")
class Output(BlockSchema):
thread: Thread = SchemaField(
description="Gmail thread with decoded message bodies"
)
error: str = SchemaField(description="Error message if any")
def __init__(self):
super().__init__(
id="21a79166-9df7-4b5f-9f36-96f639d86112",
description="Get a full Gmail thread by ID",
categories={BlockCategory.COMMUNICATION},
input_schema=GmailGetThreadBlock.Input,
output_schema=GmailGetThreadBlock.Output,
disabled=not GOOGLE_OAUTH_IS_CONFIGURED,
test_input={"threadId": "t1", "credentials": TEST_CREDENTIALS_INPUT},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"thread",
{
"id": "188199feff9dc907",
"messages": [
{
"id": "188199feff9dc907",
"to": "nick@example.co",
"body": "This email does not contain a text body.",
"date": "Thu, 17 Jul 2025 19:22:36 +0100",
"from_": "bent@example.co",
"snippet": "have a funny looking car -- Bently, Community Administrator For AutoGPT",
"subject": "car",
"threadId": "188199feff9dc907",
"attachments": [
{
"size": 5694,
"filename": "frog.jpg",
"content_type": "image/jpeg",
"attachment_id": "ANGjdJ_f777CvJ37TdHYSPIPPqJ0HVNgze1uM8alw5iiqTqAVXjsmBWxOWXrY3Z4W4rEJHfAcHVx54_TbtcZIVJJEqJfAD5LoUOK9_zKCRwwcTJ5TGgjsXcZNSnOJNazM-m4E6buo2-p0WNcA_hqQvuA36nzS31Olx3m2x7BaG1ILOkBcjlKJl4KCcR0AvnfK0S02k8i-bZVqII7XXrNp21f1BDolxH7tiEhkz3d5p-5Lbro24olgOWQwQk0SCJsTWWBMCVgbxU7oLt1QmPcjANxfpvh69Qfap3htvQxFa9P08NDI2YqQkry9yPxVR7ZBJQWrqO35EWmhNySEiX5pfG8SDRmfP9O_BqxTH35nEXmSOvZH9zb214iM-zfSoPSU1F5Fo71",
}
],
"sizeEstimate": 14099,
}
],
"historyId": "645006",
},
)
],
test_mock={
"_get_thread": lambda *args, **kwargs: {
"id": "188199feff9dc907",
"messages": [
{
"id": "188199feff9dc907",
"to": "nick@example.co",
"body": "This email does not contain a text body.",
"date": "Thu, 17 Jul 2025 19:22:36 +0100",
"from_": "bent@example.co",
"snippet": "have a funny looking car -- Bently, Community Administrator For AutoGPT",
"subject": "car",
"threadId": "188199feff9dc907",
"attachments": [
{
"size": 5694,
"filename": "frog.jpg",
"content_type": "image/jpeg",
"attachment_id": "ANGjdJ_f777CvJ37TdHYSPIPPqJ0HVNgze1uM8alw5iiqTqAVXjsmBWxOWXrY3Z4W4rEJHfAcHVx54_TbtcZIVJJEqJfAD5LoUOK9_zKCRwwcTJ5TGgjsXcZNSnOJNazM-m4E6buo2-p0WNcA_hqQvuA36nzS31Olx3m2x7BaG1ILOkBcjlKJl4KCcR0AvnfK0S02k8i-bZVqII7XXrNp21f1BDolxH7tiEhkz3d5p-5Lbro24olgOWQwQk0SCJsTWWBMCVgbxU7oLt1QmPcjANxfpvh69Qfap3htvQxFa9P08NDI2YqQkry9yPxVR7ZBJQWrqO35EWmhNySEiX5pfG8SDRmfP9O_BqxTH35nEXmSOvZH9zb214iM-zfSoPSU1F5Fo71",
}
],
"sizeEstimate": 14099,
}
],
"historyId": "645006",
}
},
)
async def run(
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
) -> BlockOutput:
service = GmailReadBlock._build_service(credentials, **kwargs)
thread = self._get_thread(service, input_data.threadId, credentials.scopes)
yield "thread", thread
def _get_thread(self, service, thread_id: str, scopes: list[str] | None) -> Thread:
scopes = [s.lower() for s in (scopes or [])]
format_type = (
"metadata"
if "https://www.googleapis.com/auth/gmail.metadata" in scopes
else "full"
)
thread = (
service.users()
.threads()
.get(userId="me", id=thread_id, format=format_type)
.execute()
)
parsed_messages = []
for msg in thread.get("messages", []):
headers = {
h["name"].lower(): h["value"]
for h in msg.get("payload", {}).get("headers", [])
}
body = self._get_email_body(msg)
attachments = self._get_attachments(service, msg)
email = Email(
threadId=msg.get("threadId", thread_id),
id=msg["id"],
subject=headers.get("subject", "No Subject"),
snippet=msg.get("snippet", ""),
from_=parseaddr(headers.get("from", ""))[1],
to=parseaddr(headers.get("to", ""))[1],
date=headers.get("date", ""),
body=body,
sizeEstimate=msg.get("sizeEstimate", 0),
attachments=attachments,
)
parsed_messages.append(email.model_dump())
thread["messages"] = parsed_messages
return thread
def _get_email_body(self, msg):
payload = msg.get("payload")
if not payload:
return "This email does not contain a text body."
if "parts" in payload:
for part in payload["parts"]:
if part.get("mimeType") == "text/plain" and "data" in part.get(
"body", {}
):
return base64.urlsafe_b64decode(part["body"]["data"]).decode(
"utf-8"
)
elif payload.get("mimeType") == "text/plain" and "data" in payload.get(
"body", {}
):
return base64.urlsafe_b64decode(payload["body"]["data"]).decode("utf-8")
return "This email does not contain a text body."
def _get_attachments(self, service, message):
attachments = []
if "parts" in message["payload"]:
for part in message["payload"]["parts"]:
if part.get("filename"):
attachment = Attachment(
filename=part["filename"],
content_type=part["mimeType"],
size=int(part["body"].get("size", 0)),
attachment_id=part["body"]["attachmentId"],
)
attachments.append(attachment)
return attachments
class GmailReplyBlock(Block):
class Input(BlockSchema):
credentials: GoogleCredentialsInput = GoogleCredentialsField(
["https://www.googleapis.com/auth/gmail.send"]
)
threadId: str = SchemaField(description="Thread ID to reply in")
parentMessageId: str = SchemaField(
description="ID of the message being replied to"
)
to: list[str] = SchemaField(description="To recipients", default_factory=list)
cc: list[str] = SchemaField(description="CC recipients", default_factory=list)
bcc: list[str] = SchemaField(description="BCC recipients", default_factory=list)
replyAll: bool = SchemaField(
description="Reply to all original recipients", default=False
)
subject: str = SchemaField(description="Email subject", default="")
body: str = SchemaField(description="Email body")
attachments: list[MediaFileType] = SchemaField(
description="Files to attach", default_factory=list, advanced=True
)
class Output(BlockSchema):
messageId: str = SchemaField(description="Sent message ID")
threadId: str = SchemaField(description="Thread ID")
message: dict = SchemaField(description="Raw Gmail message object")
error: str = SchemaField(description="Error message if any")
def __init__(self):
super().__init__(
id="12bf5a24-9b90-4f40-9090-4e86e6995e60",
description="Reply to a Gmail thread",
categories={BlockCategory.COMMUNICATION},
input_schema=GmailReplyBlock.Input,
output_schema=GmailReplyBlock.Output,
disabled=not GOOGLE_OAUTH_IS_CONFIGURED,
test_input={
"threadId": "t1",
"parentMessageId": "m1",
"body": "Thanks",
"replyAll": False,
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[
("messageId", "m2"),
("threadId", "t1"),
("message", {"id": "m2", "threadId": "t1"}),
],
test_mock={
"_reply": lambda *args, **kwargs: {
"id": "m2",
"threadId": "t1",
}
},
)
async def run(
self,
input_data: Input,
*,
credentials: GoogleCredentials,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
service = GmailReadBlock._build_service(credentials, **kwargs)
message = await self._reply(
service,
input_data,
graph_exec_id,
)
yield "messageId", message["id"]
yield "threadId", message.get("threadId", input_data.threadId)
yield "message", message
async def _reply(self, service, input_data: Input, graph_exec_id: str) -> dict:
parent = (
service.users()
.messages()
.get(
userId="me",
id=input_data.parentMessageId,
format="metadata",
metadataHeaders=[
"Subject",
"References",
"Message-ID",
"From",
"To",
"Cc",
"Reply-To",
],
)
.execute()
)
headers = {
h["name"].lower(): h["value"]
for h in parent.get("payload", {}).get("headers", [])
}
if not (input_data.to or input_data.cc or input_data.bcc):
if input_data.replyAll:
recipients = [parseaddr(headers.get("from", ""))[1]]
recipients += [
addr for _, addr in getaddresses([headers.get("to", "")])
]
recipients += [
addr for _, addr in getaddresses([headers.get("cc", "")])
]
dedup: list[str] = []
for r in recipients:
if r and r not in dedup:
dedup.append(r)
input_data.to = dedup
else:
sender = parseaddr(headers.get("reply-to", headers.get("from", "")))[1]
input_data.to = [sender] if sender else []
subject = input_data.subject or (f"Re: {headers.get('subject', '')}".strip())
references = headers.get("references", "").split()
if headers.get("message-id"):
references.append(headers["message-id"])
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
msg = MIMEMultipart()
if input_data.to:
msg["To"] = ", ".join(input_data.to)
if input_data.cc:
msg["Cc"] = ", ".join(input_data.cc)
if input_data.bcc:
msg["Bcc"] = ", ".join(input_data.bcc)
msg["Subject"] = subject
if headers.get("message-id"):
msg["In-Reply-To"] = headers["message-id"]
if references:
msg["References"] = " ".join(references)
msg.attach(
MIMEText(input_data.body, "html" if "<" in input_data.body else "plain")
)
for attach in input_data.attachments:
local_path = await store_media_file(
graph_exec_id, attach, return_content=False
)
abs_path = get_exec_file_path(graph_exec_id, local_path)
part = MIMEBase("application", "octet-stream")
with open(abs_path, "rb") as f:
part.set_payload(f.read())
encoders.encode_base64(part)
part.add_header(
"Content-Disposition", f"attachment; filename={Path(abs_path).name}"
)
msg.attach(part)
raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8")
return (
service.users()
.messages()
.send(userId="me", body={"threadId": input_data.threadId, "raw": raw})
.execute()
)

View File

@@ -100,6 +100,8 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| Block Name | Description |
|------------|-------------|
| [Gmail Read](google/gmail.md#gmail-read) | Retrieves and reads emails from a Gmail account |
| [Gmail Get Thread](google/gmail.md#gmail-get-thread) | Returns every message in a Gmail thread |
| [Gmail Reply](google/gmail.md#gmail-reply) | Sends a reply that stays in the same thread |
| [Gmail Send](google/gmail.md#gmail-send) | Sends emails using a Gmail account |
| [Gmail List Labels](google/gmail.md#gmail-list-labels) | Retrieves all labels from a Gmail account |
| [Gmail Add Label](google/gmail.md#gmail-add-label) | Adds a label to a specific email in a Gmail account |

View File

@@ -15,13 +15,13 @@ The block connects to the user's Gmail account using their credentials, performs
| Input | Description |
|-------|-------------|
| Credentials | The user's Gmail account credentials for authentication |
| Query | A search query to filter emails (e.g., "is:unread" for unread emails) |
| Query | A search query to filter emails (e.g., "is:unread" for unread emails). Ignored if using only the `gmail.metadata` scope. |
| Max Results | The maximum number of emails to retrieve |
### Outputs
| Output | Description |
|--------|-------------|
| Email | Detailed information about a single email |
| Email | Detailed information about a single email (now includes `threadId`) |
| Emails | A list of email data for multiple emails |
| Error | An error message if something goes wrong during the process |
@@ -141,4 +141,63 @@ The block first finds the ID of the specified label in the user's Gmail account.
| Error | An error message if something goes wrong during the process |
### Possible use case
Automatically removing the "Unread" label from emails after they have been processed by a customer service representative.
Automatically removing the "Unread" label from emails after they have been processed by a customer service representative.
---
## Gmail Get Thread
### What it is
A block that retrieves an entire Gmail thread.
### What it does
Given a `threadId`, this block fetches all messages in that thread and decodes the text bodies.
### Inputs
| Input | Description |
|-------|-------------|
| Credentials | The user's Gmail account credentials for authentication |
| threadId | The ID of the thread to fetch |
### Outputs
| Output | Description |
|--------|-------------|
| Thread | Gmail thread with decoded messages |
| Error | An error message if something goes wrong |
### Possible use case
Checking if a recipient replied in an existing conversation.
---
## Gmail Reply
### What it is
A block that sends a reply within an existing Gmail thread.
### What it does
This block builds a properly formatted reply email and sends it so Gmail keeps it in the same conversation.
### Inputs
| Input | Description |
|-------|-------------|
| Credentials | The user's Gmail account credentials for authentication |
| threadId | The thread to reply in |
| parentMessageId | The ID of the message you are replying to |
| To | List of recipients |
| Cc | List of CC recipients |
| Bcc | List of BCC recipients |
| Subject | Optional subject (defaults to `Re:` prefix) |
| Body | The email body |
| Attachments | Optional files to include |
### Outputs
| Output | Description |
|--------|-------------|
| MessageId | The ID of the sent message |
| ThreadId | The thread the reply belongs to |
| Message | Full Gmail message object |
| Error | Error message if something goes wrong |
### Possible use case
Automatically respond "Thanks, see you then" to a scheduling email while keeping the conversation tidy.