Compare commits

..

22 Commits

Author SHA1 Message Date
Abhimanyu Yadav
c6e7a28325 Merge branch 'dev' into abhi/add-agent-mail-blocks 2026-03-17 11:55:10 +05:30
abhi1992002
79dcfed31b fix(blocks): remove duplicate agentmail entry in pyproject.toml 2026-03-17 11:04:33 +05:30
abhi1992002
404c0fdc0d fix(blocks): remove unneeded __init__.py from agent_mail package
Per reviewer feedback — the file was empty and not required.
2026-03-17 11:00:19 +05:30
abhi1992002
cf1ba67d86 refactor(blocks): use AsyncAgentMail client in all agent mail blocks
Replace synchronous AgentMail client with AsyncAgentMail to avoid
blocking the event loop during HTTP calls in async run() methods.
2026-03-17 11:00:19 +05:30
abhi1992002
c3855c0910 Mark agent mail blocks as sensitive actions 2026-03-17 11:00:19 +05:30
abhi1992002
cd0ff8d386 Fix shallow copy issue in agent mail thread blocks
Create explicit copies of thread dictionaries to prevent unintended
mutations of the original objects.
2026-03-17 11:00:19 +05:30
abhi1992002
5670938d42 Fix AgentMail block message handling and list entry validation
- Only include reason parameter for block list types in list creation
- Replace raw thread messages with processed messages in result dict
2026-03-17 11:00:19 +05:30
abhi1992002
f6ccc8cac7 Update Agent Mail documentation for accuracy
- Clarify that Agent Mail Forward Message supports multiple recipients
- Fix type annotations in Update Draft parameters (remove Optional
  wrapper)
2026-03-17 11:00:19 +05:30
abhi1992002
cd3cad1fef Fix null handling in agent mail blocks
Use `None or ""` pattern instead of empty string defaults to
properly handle null/None values returned from the mail service.
2026-03-17 11:00:19 +05:30
abhi1992002
c0ebb5c719 Add CC/BCC support to forward message block
Extends the Agent Mail forward message block to support multiple
recipients with CC and BCC fields. Adds validation to limit total
recipients to 50 across all fields. Updates documentation to reflect
the new functionality.
2026-03-17 11:00:19 +05:30
abhi1992002
1c6cfa5e78 Improve AgentMail documentation with detailed explanations
Add comprehensive "How it works" and "Possible use case" sections for
all inbox-related blocks. Update the Update Draft parameters to clarify
that omitting optional fields preserves current values and mark them as
Optional in the type column.
2026-03-17 11:00:19 +05:30
abhi1992002
a3651ad682 Improve AgentMail block error handling and type safety
- Add try-catch blocks to attachment operations with error output
- Handle both bytes and string attachment data types properly
- Fix count calculations using walrus operator to distinguish None from
  0
- Change recipient fields (to, cc, bcc) from string to list types for
  multiple recipients
- Fix positional argument to keyword argument in pod inboxes list call
- Update documentation with technical explanations and
2026-03-17 11:00:18 +05:30
abhi1992002
78ef378ca8 Remove unused BlockCostType import and base cost configuration 2026-03-17 10:59:48 +05:30
abhi1992002
45897957a5 Format agent mail blocks to 50 character line limit 2026-03-17 10:59:48 +05:30
abhi1992002
553cac94ca Standardize AgentMail block naming in documentation
The changes standardize the naming convention for AgentMail-related
blocks throughout the documentation. Block names are now consistently
formatted as "Agent Mail" (two words) instead of "AgentMail" (one word),
and block references are updated to use kebab-case anchors (e.g.,
`#agent-mail-send-message`). Descriptions and other documentation fields
have been streamlined to placeholder text where needed, and outputs are
now consistently ordered with error fields first. The SUMMARY.md
navigation has been reordered alphabetically to place Agent Mail blocks
at the top.
2026-03-17 10:59:48 +05:30
abhi1992002
00945ae33b Fix null handling in agent mail list blocks
Replace unsafe `getattr` defaults with proper null coalescing to ensure
count falls back to list length and next_page_token defaults to empty
string even when attributes exist with falsy values.
2026-03-17 10:59:48 +05:30
abhi1992002
d2d599c89b Add AgentMail block documentation
Adds comprehensive documentation for 40+ AgentMail blocks across 7
categories: messages, threads, drafts, inbox management, attachments,
allow/block lists, and pod-based multi-tenancy. Updates integration
index and summary to reference new block documentation files.
2026-03-17 10:59:48 +05:30
abhi1992002
ed0ca6e102 Add AgentMail attachment, list, and pod management blocks 2026-03-17 10:59:48 +05:30
abhi1992002
669c73ff0e Add AgentMail draft management blocks
Implements six new blocks for creating, retrieving, listing, updating,
sending, and deleting email drafts in AgentMail inboxes, including
org-wide draft listing and scheduled send support.
2026-03-17 10:59:48 +05:30
abhi1992002
68e346f1cb Add AgentMail thread management blocks
Implement five new blocks for managing email threads in AgentMail:
- List and get threads within specific inboxes
- Delete threads from inboxes
- List and get threads org-wide across all inboxes
2026-03-17 10:59:48 +05:30
abhi1992002
556198afb7 Add AgentMail message blocks 2026-03-17 10:59:48 +05:30
abhi1992002
0d21f9b19f Add AgentMail inbox management blocks
Add blocks for creating, retrieving, listing, updating, and deleting
AgentMail inboxes. Includes shared provider configuration and mock
credentials for testing.
2026-03-17 10:59:48 +05:30
23 changed files with 4096 additions and 356 deletions

View File

@@ -178,16 +178,6 @@ yield "image_url", result_url
3. Write tests alongside the route file
4. Run `poetry run test` to verify
## Workspace & Media Files
**Read [Workspace & Media Architecture](../../docs/platform/workspace-media-architecture.md) when:**
- Working on CoPilot file upload/download features
- Building blocks that handle `MediaFileType` inputs/outputs
- Modifying `WorkspaceManager` or `store_media_file()`
- Debugging file persistence or virus scanning issues
Covers: `WorkspaceManager` (persistent storage with session scoping), `store_media_file()` (media normalization pipeline), and responsibility boundaries for virus scanning and persistence.
## Security Implementation
### Cache Protection Middleware

View File

@@ -0,0 +1,19 @@
"""
Shared configuration for all AgentMail blocks.
"""
from backend.sdk import APIKeyCredentials, ProviderBuilder, SecretStr
agent_mail = (
ProviderBuilder("agent_mail")
.with_api_key("AGENTMAIL_API_KEY", "AgentMail API Key")
.build()
)
TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef",
provider="agent_mail",
title="Mock AgentMail API Key",
api_key=SecretStr("mock-agentmail-api-key"),
expires_at=None,
)

View File

@@ -0,0 +1,161 @@
"""
AgentMail Attachment blocks — download file attachments from messages and threads.
Attachments are files associated with messages (PDFs, CSVs, images, etc.).
To send attachments, include them in the attachments parameter when using
AgentMailSendMessageBlock or AgentMailReplyToMessageBlock.
To download, first get the attachment_id from a message's attachments array,
then use these blocks to retrieve the file content as base64.
"""
import base64
from agentmail import AsyncAgentMail
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
CredentialsMetaInput,
SchemaField,
)
from ._config import agent_mail
def _client(credentials: APIKeyCredentials) -> AsyncAgentMail:
return AsyncAgentMail(api_key=credentials.api_key.get_secret_value())
class AgentMailGetMessageAttachmentBlock(Block):
"""
Download a file attachment from a specific email message.
Retrieves the raw file content and returns it as base64-encoded data.
First get the attachment_id from a message object's attachments array,
then use this block to download the file.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the message belongs to"
)
message_id: str = SchemaField(
description="Message ID containing the attachment"
)
attachment_id: str = SchemaField(
description="Attachment ID to download (from the message's attachments array)"
)
class Output(BlockSchemaOutput):
content_base64: str = SchemaField(
description="File content encoded as a base64 string. Decode with base64.b64decode() to get raw bytes."
)
attachment_id: str = SchemaField(
description="The attachment ID that was downloaded"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="a283ffc4-8087-4c3d-9135-8f26b86742ec",
description="Download a file attachment from an email message. Returns base64-encoded file content.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
try:
client = _client(credentials)
data = await client.inboxes.messages.get_attachment(
inbox_id=input_data.inbox_id,
message_id=input_data.message_id,
attachment_id=input_data.attachment_id,
)
if isinstance(data, bytes):
encoded = base64.b64encode(data).decode()
elif isinstance(data, str):
encoded = base64.b64encode(data.encode("utf-8")).decode()
else:
raise TypeError(
f"Unexpected attachment data type: {type(data).__name__}"
)
yield "content_base64", encoded
yield "attachment_id", input_data.attachment_id
except Exception as e:
yield "error", str(e)
class AgentMailGetThreadAttachmentBlock(Block):
"""
Download a file attachment from a conversation thread.
Same as GetMessageAttachment but looks up by thread ID instead of
message ID. Useful when you know the thread but not the specific
message containing the attachment.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the thread belongs to"
)
thread_id: str = SchemaField(description="Thread ID containing the attachment")
attachment_id: str = SchemaField(
description="Attachment ID to download (from a message's attachments array within the thread)"
)
class Output(BlockSchemaOutput):
content_base64: str = SchemaField(
description="File content encoded as a base64 string. Decode with base64.b64decode() to get raw bytes."
)
attachment_id: str = SchemaField(
description="The attachment ID that was downloaded"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="06b6a4c4-9d71-4992-9e9c-cf3b352763b5",
description="Download a file attachment from a conversation thread. Returns base64-encoded file content.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
try:
client = _client(credentials)
data = await client.inboxes.threads.get_attachment(
inbox_id=input_data.inbox_id,
thread_id=input_data.thread_id,
attachment_id=input_data.attachment_id,
)
if isinstance(data, bytes):
encoded = base64.b64encode(data).decode()
elif isinstance(data, str):
encoded = base64.b64encode(data.encode("utf-8")).decode()
else:
raise TypeError(
f"Unexpected attachment data type: {type(data).__name__}"
)
yield "content_base64", encoded
yield "attachment_id", input_data.attachment_id
except Exception as e:
yield "error", str(e)

View File

@@ -0,0 +1,501 @@
"""
AgentMail Draft blocks — create, get, list, update, send, and delete drafts.
A Draft is an unsent message that can be reviewed, edited, and sent later.
Drafts enable human-in-the-loop review, scheduled sending (via send_at),
and complex multi-step email composition workflows.
"""
from typing import Optional
from agentmail import AsyncAgentMail
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
CredentialsMetaInput,
SchemaField,
)
from ._config import agent_mail
def _client(credentials: APIKeyCredentials) -> AsyncAgentMail:
return AsyncAgentMail(api_key=credentials.api_key.get_secret_value())
class AgentMailCreateDraftBlock(Block):
"""
Create a draft email in an AgentMail inbox for review or scheduled sending.
Drafts let agents prepare emails without sending immediately. Use send_at
to schedule automatic sending at a future time (ISO 8601 format).
Scheduled drafts are auto-labeled 'scheduled' and can be cancelled by
deleting the draft.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to create the draft in"
)
to: list[str] = SchemaField(
description="Recipient email addresses (e.g. ['user@example.com'])"
)
subject: str = SchemaField(description="Email subject line", default="")
text: str = SchemaField(description="Plain text body of the draft", default="")
html: str = SchemaField(
description="Rich HTML body of the draft", default="", advanced=True
)
cc: list[str] = SchemaField(
description="CC recipient email addresses",
default_factory=list,
advanced=True,
)
bcc: list[str] = SchemaField(
description="BCC recipient email addresses",
default_factory=list,
advanced=True,
)
in_reply_to: str = SchemaField(
description="Message ID this draft replies to, for threading follow-up drafts",
default="",
advanced=True,
)
send_at: str = SchemaField(
description="Schedule automatic sending at this ISO 8601 datetime (e.g. '2025-01-15T09:00:00Z'). Leave empty for manual send.",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
draft_id: str = SchemaField(
description="Unique identifier of the created draft"
)
send_status: str = SchemaField(
description="'scheduled' if send_at was set, empty otherwise. Values: scheduled, sending, failed.",
default="",
)
result: dict = SchemaField(
description="Complete draft object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="25ac9086-69fd-48b8-b910-9dbe04b8f3bd",
description="Create a draft email for review or scheduled sending. Use send_at for automatic future delivery.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"to": input_data.to}
if input_data.subject:
params["subject"] = input_data.subject
if input_data.text:
params["text"] = input_data.text
if input_data.html:
params["html"] = input_data.html
if input_data.cc:
params["cc"] = input_data.cc
if input_data.bcc:
params["bcc"] = input_data.bcc
if input_data.in_reply_to:
params["in_reply_to"] = input_data.in_reply_to
if input_data.send_at:
params["send_at"] = input_data.send_at
draft = await client.inboxes.drafts.create(input_data.inbox_id, **params)
result = draft.__dict__ if hasattr(draft, "__dict__") else {}
yield "draft_id", draft.draft_id
yield "send_status", getattr(draft, "send_status", None) or ""
yield "result", result
class AgentMailGetDraftBlock(Block):
"""
Retrieve a specific draft from an AgentMail inbox.
Returns the draft contents including recipients, subject, body, and
scheduled send status. Use this to review a draft before approving it.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the draft belongs to"
)
draft_id: str = SchemaField(description="Draft ID to retrieve")
class Output(BlockSchemaOutput):
draft_id: str = SchemaField(description="Unique identifier of the draft")
subject: str = SchemaField(description="Draft subject line", default="")
send_status: str = SchemaField(
description="Scheduled send status: 'scheduled', 'sending', 'failed', or empty",
default="",
)
send_at: str = SchemaField(
description="Scheduled send time (ISO 8601) if set", default=""
)
result: dict = SchemaField(description="Complete draft object with all fields")
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="8e57780d-dc25-43d4-a0f4-1f02877b09fb",
description="Retrieve a draft email to review its contents, recipients, and scheduled send status.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
draft = await client.inboxes.drafts.get(
inbox_id=input_data.inbox_id,
draft_id=input_data.draft_id,
)
result = draft.__dict__ if hasattr(draft, "__dict__") else {}
yield "draft_id", draft.draft_id
yield "subject", getattr(draft, "subject", None) or ""
yield "send_status", getattr(draft, "send_status", None) or ""
yield "send_at", getattr(draft, "send_at", None) or ""
yield "result", result
class AgentMailListDraftsBlock(Block):
"""
List all drafts in an AgentMail inbox with optional label filtering.
Use labels=['scheduled'] to find all drafts queued for future sending.
Useful for building approval dashboards or monitoring pending outreach.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to list drafts from"
)
limit: int = SchemaField(
description="Maximum number of drafts to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
labels: list[str] = SchemaField(
description="Filter drafts by labels (e.g. ['scheduled'] for pending sends)",
default_factory=list,
advanced=True,
)
class Output(BlockSchemaOutput):
drafts: list[dict] = SchemaField(
description="List of draft objects with subject, recipients, send_status, etc."
)
count: int = SchemaField(description="Number of drafts returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="e84883b7-7c39-4c5c-88e8-0a72b078ea63",
description="List drafts in an AgentMail inbox. Filter by labels=['scheduled'] to find pending sends.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
if input_data.labels:
params["labels"] = input_data.labels
response = await client.inboxes.drafts.list(input_data.inbox_id, **params)
drafts = [
d.__dict__ if hasattr(d, "__dict__") else d
for d in getattr(response, "drafts", [])
]
yield "drafts", drafts
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(drafts)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailUpdateDraftBlock(Block):
"""
Update an existing draft's content, recipients, or scheduled send time.
Use this to reschedule a draft (change send_at), modify recipients,
or edit the subject/body before sending. To cancel a scheduled send,
delete the draft instead.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the draft belongs to"
)
draft_id: str = SchemaField(description="Draft ID to update")
to: Optional[list[str]] = SchemaField(
description="Updated recipient email addresses (replaces existing list). Omit to keep current value.",
default=None,
)
subject: Optional[str] = SchemaField(
description="Updated subject line. Omit to keep current value.",
default=None,
)
text: Optional[str] = SchemaField(
description="Updated plain text body. Omit to keep current value.",
default=None,
)
html: Optional[str] = SchemaField(
description="Updated HTML body. Omit to keep current value.",
default=None,
advanced=True,
)
send_at: Optional[str] = SchemaField(
description="Reschedule: new ISO 8601 send time (e.g. '2025-01-20T14:00:00Z'). Omit to keep current value.",
default=None,
advanced=True,
)
class Output(BlockSchemaOutput):
draft_id: str = SchemaField(description="The updated draft ID")
send_status: str = SchemaField(description="Updated send status", default="")
result: dict = SchemaField(description="Complete updated draft object")
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="351f6e51-695a-421a-9032-46a587b10336",
description="Update a draft's content, recipients, or scheduled send time. Use to reschedule or edit before sending.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {}
if input_data.to is not None:
params["to"] = input_data.to
if input_data.subject is not None:
params["subject"] = input_data.subject
if input_data.text is not None:
params["text"] = input_data.text
if input_data.html is not None:
params["html"] = input_data.html
if input_data.send_at is not None:
params["send_at"] = input_data.send_at
draft = await client.inboxes.drafts.update(
inbox_id=input_data.inbox_id,
draft_id=input_data.draft_id,
**params,
)
result = draft.__dict__ if hasattr(draft, "__dict__") else {}
yield "draft_id", draft.draft_id
yield "send_status", getattr(draft, "send_status", None) or ""
yield "result", result
class AgentMailSendDraftBlock(Block):
"""
Send a draft immediately, converting it into a delivered message.
The draft is deleted after successful sending and becomes a regular
message with a message_id. Use this for human-in-the-loop approval
workflows: agent creates draft, human reviews, then this block sends it.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the draft belongs to"
)
draft_id: str = SchemaField(description="Draft ID to send now")
class Output(BlockSchemaOutput):
message_id: str = SchemaField(
description="Message ID of the now-sent email (draft is deleted)"
)
thread_id: str = SchemaField(
description="Thread ID the sent message belongs to"
)
result: dict = SchemaField(description="Complete sent message object")
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="37c39e83-475d-4b3d-843a-d923d001b85a",
description="Send a draft immediately, converting it into a delivered message. The draft is deleted after sending.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
msg = await client.inboxes.drafts.send(
inbox_id=input_data.inbox_id,
draft_id=input_data.draft_id,
)
result = msg.__dict__ if hasattr(msg, "__dict__") else {}
yield "message_id", msg.message_id
yield "thread_id", getattr(msg, "thread_id", None) or ""
yield "result", result
class AgentMailDeleteDraftBlock(Block):
"""
Delete a draft from an AgentMail inbox. Also cancels any scheduled send.
If the draft was scheduled with send_at, deleting it cancels the
scheduled delivery. This is the way to cancel a scheduled email.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the draft belongs to"
)
draft_id: str = SchemaField(
description="Draft ID to delete (also cancels scheduled sends)"
)
class Output(BlockSchemaOutput):
success: bool = SchemaField(
description="True if the draft was successfully deleted/cancelled"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="9023eb99-3e2f-4def-808b-d9c584b3d9e7",
description="Delete a draft or cancel a scheduled email. Removes the draft permanently.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
await client.inboxes.drafts.delete(
inbox_id=input_data.inbox_id,
draft_id=input_data.draft_id,
)
yield "success", True
class AgentMailListOrgDraftsBlock(Block):
"""
List all drafts across every inbox in your organization.
Returns drafts from all inboxes in one query. Perfect for building
a central approval dashboard where a human supervisor can review
and approve any draft created by any agent.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
limit: int = SchemaField(
description="Maximum number of drafts to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
drafts: list[dict] = SchemaField(
description="List of draft objects from all inboxes in the organization"
)
count: int = SchemaField(description="Number of drafts returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="ed7558ae-3a07-45f5-af55-a25fe88c9971",
description="List all drafts across every inbox in your organization. Use for central approval dashboards.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
response = await client.drafts.list(**params)
drafts = [
d.__dict__ if hasattr(d, "__dict__") else d
for d in getattr(response, "drafts", [])
]
yield "drafts", drafts
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(drafts)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""

View File

@@ -0,0 +1,302 @@
"""
AgentMail Inbox blocks — create, get, list, update, and delete inboxes.
An Inbox is a fully programmable email account for AI agents. Each inbox gets
a unique email address and can send, receive, and manage emails via the
AgentMail API. You can create thousands of inboxes on demand.
"""
from agentmail import AsyncAgentMail
from agentmail.inboxes.types import CreateInboxRequest
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
CredentialsMetaInput,
SchemaField,
)
from ._config import agent_mail
def _client(credentials: APIKeyCredentials) -> AsyncAgentMail:
return AsyncAgentMail(api_key=credentials.api_key.get_secret_value())
class AgentMailCreateInboxBlock(Block):
"""
Create a new email inbox for an AI agent via AgentMail.
Each inbox gets a unique email address (e.g. username@agentmail.to).
If username and domain are not provided, AgentMail auto-generates them.
Use custom domains by specifying the domain field.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
username: str = SchemaField(
description="Local part of the email address (e.g. 'support' for support@domain.com). Leave empty to auto-generate.",
default="",
advanced=False,
)
domain: str = SchemaField(
description="Email domain (e.g. 'mydomain.com'). Defaults to agentmail.to if empty.",
default="",
advanced=False,
)
display_name: str = SchemaField(
description="Friendly name shown in the 'From' field of sent emails (e.g. 'Support Agent')",
default="",
advanced=False,
)
class Output(BlockSchemaOutput):
inbox_id: str = SchemaField(
description="Unique identifier for the created inbox (also the email address)"
)
email_address: str = SchemaField(
description="Full email address of the inbox (e.g. support@agentmail.to)"
)
result: dict = SchemaField(
description="Complete inbox object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="7a8ac219-c6ec-4eec-a828-81af283ce04c",
description="Create a new email inbox for an AI agent via AgentMail. Each inbox gets a unique address and can send/receive emails.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {}
if input_data.username:
params["username"] = input_data.username
if input_data.domain:
params["domain"] = input_data.domain
if input_data.display_name:
params["display_name"] = input_data.display_name
inbox = await client.inboxes.create(request=CreateInboxRequest(**params))
result = inbox.__dict__ if hasattr(inbox, "__dict__") else {}
yield "inbox_id", inbox.inbox_id
yield "email_address", getattr(inbox, "email_address", inbox.inbox_id)
yield "result", result
class AgentMailGetInboxBlock(Block):
"""
Retrieve details of an existing AgentMail inbox by its ID or email address.
Returns the inbox metadata including email address, display name, and
configuration. Use this to check if an inbox exists or get its properties.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to look up (e.g. 'support@agentmail.to')"
)
class Output(BlockSchemaOutput):
inbox_id: str = SchemaField(description="Unique identifier of the inbox")
email_address: str = SchemaField(description="Full email address of the inbox")
display_name: str = SchemaField(
description="Friendly name shown in the 'From' field", default=""
)
result: dict = SchemaField(
description="Complete inbox object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="b858f62b-6c12-4736-aaf2-dbc5a9281320",
description="Retrieve details of an existing AgentMail inbox including its email address, display name, and configuration.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
inbox = await client.inboxes.get(inbox_id=input_data.inbox_id)
result = inbox.__dict__ if hasattr(inbox, "__dict__") else {}
yield "inbox_id", inbox.inbox_id
yield "email_address", getattr(inbox, "email_address", inbox.inbox_id)
yield "display_name", getattr(inbox, "display_name", None) or ""
yield "result", result
class AgentMailListInboxesBlock(Block):
"""
List all email inboxes in your AgentMail organization.
Returns a paginated list of all inboxes with their metadata.
Use page_token for pagination when you have many inboxes.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
limit: int = SchemaField(
description="Maximum number of inboxes to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page of results",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
inboxes: list[dict] = SchemaField(
description="List of inbox objects, each containing inbox_id, email_address, display_name, etc."
)
count: int = SchemaField(
description="Total number of inboxes in your organization"
)
next_page_token: str = SchemaField(
description="Token to pass as page_token to get the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="cfd84a06-2121-4cef-8d14-8badf52d22f0",
description="List all email inboxes in your AgentMail organization with pagination support.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
response = await client.inboxes.list(**params)
inboxes = [
inbox.__dict__ if hasattr(inbox, "__dict__") else inbox
for inbox in getattr(response, "inboxes", [])
]
yield "inboxes", inboxes
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(inboxes)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailUpdateInboxBlock(Block):
"""
Update the display name of an existing AgentMail inbox.
Changes the friendly name shown in the 'From' field when emails are sent
from this inbox. The email address itself cannot be changed.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to update (e.g. 'support@agentmail.to')"
)
display_name: str = SchemaField(
description="New display name for the inbox (e.g. 'Customer Support Bot')"
)
class Output(BlockSchemaOutput):
inbox_id: str = SchemaField(description="The updated inbox ID")
result: dict = SchemaField(
description="Complete updated inbox object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="59b49f59-a6d1-4203-94c0-3908adac50b6",
description="Update the display name of an AgentMail inbox. Changes the 'From' name shown when emails are sent.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
inbox = await client.inboxes.update(
inbox_id=input_data.inbox_id,
display_name=input_data.display_name,
)
result = inbox.__dict__ if hasattr(inbox, "__dict__") else {}
yield "inbox_id", inbox.inbox_id
yield "result", result
class AgentMailDeleteInboxBlock(Block):
"""
Permanently delete an AgentMail inbox and all its data.
This removes the inbox, all its messages, threads, and drafts.
This action cannot be undone. The email address will no longer
receive or send emails.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to permanently delete"
)
class Output(BlockSchemaOutput):
success: bool = SchemaField(
description="True if the inbox was successfully deleted"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="ade970ae-8428-4a7b-9278-b52054dbf535",
description="Permanently delete an AgentMail inbox and all its messages, threads, and drafts. This action cannot be undone.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
await client.inboxes.delete(inbox_id=input_data.inbox_id)
yield "success", True

View File

@@ -0,0 +1,278 @@
"""
AgentMail List blocks — manage allow/block lists for email filtering.
Lists let you control which email addresses and domains your agents can
send to or receive from. There are four list types based on two dimensions:
direction (send/receive) and type (allow/block).
- receive + allow: Only accept emails from these addresses/domains
- receive + block: Reject emails from these addresses/domains
- send + allow: Only send emails to these addresses/domains
- send + block: Prevent sending emails to these addresses/domains
"""
from enum import Enum
from agentmail import AsyncAgentMail
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
CredentialsMetaInput,
SchemaField,
)
from ._config import agent_mail
def _client(credentials: APIKeyCredentials) -> AsyncAgentMail:
return AsyncAgentMail(api_key=credentials.api_key.get_secret_value())
class ListDirection(str, Enum):
SEND = "send"
RECEIVE = "receive"
class ListType(str, Enum):
ALLOW = "allow"
BLOCK = "block"
class AgentMailListEntriesBlock(Block):
"""
List all entries in an AgentMail allow/block list.
Retrieves email addresses and domains that are currently allowed
or blocked for sending or receiving. Use direction and list_type
to select which of the four lists to query.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
direction: ListDirection = SchemaField(
description="'send' to filter outgoing emails, 'receive' to filter incoming emails"
)
list_type: ListType = SchemaField(
description="'allow' for whitelist (only permit these), 'block' for blacklist (reject these)"
)
limit: int = SchemaField(
description="Maximum number of entries to return per page",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
entries: list[dict] = SchemaField(
description="List of entries, each with an email address or domain"
)
count: int = SchemaField(description="Number of entries returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="01489100-35da-45aa-8a01-9540ba0e9a21",
description="List all entries in an AgentMail allow/block list. Choose send/receive direction and allow/block type.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
response = await client.lists.list(
input_data.direction.value, input_data.list_type.value, **params
)
entries = [
e.__dict__ if hasattr(e, "__dict__") else e
for e in getattr(response, "entries", [])
]
yield "entries", entries
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(entries)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailCreateListEntryBlock(Block):
"""
Add an email address or domain to an AgentMail allow/block list.
Entries can be full email addresses (e.g. 'partner@example.com') or
entire domains (e.g. 'example.com'). For block lists, you can optionally
provide a reason (e.g. 'spam', 'competitor').
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
direction: ListDirection = SchemaField(
description="'send' for outgoing email rules, 'receive' for incoming email rules"
)
list_type: ListType = SchemaField(
description="'allow' to whitelist, 'block' to blacklist"
)
entry: str = SchemaField(
description="Email address (user@example.com) or domain (example.com) to add"
)
reason: str = SchemaField(
description="Reason for blocking (only used with block lists, e.g. 'spam', 'competitor')",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
entry: str = SchemaField(
description="The email address or domain that was added"
)
result: dict = SchemaField(description="Complete entry object")
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="b6650a0a-b113-40cf-8243-ff20f684f9b8",
description="Add an email address or domain to an allow/block list. Block spam senders or whitelist trusted domains.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"entry": input_data.entry}
if input_data.reason and input_data.list_type == ListType.BLOCK:
params["reason"] = input_data.reason
result = await client.lists.create(
input_data.direction.value, input_data.list_type.value, **params
)
result_dict = result.__dict__ if hasattr(result, "__dict__") else {}
yield "entry", input_data.entry
yield "result", result_dict
class AgentMailGetListEntryBlock(Block):
"""
Check if an email address or domain exists in an AgentMail allow/block list.
Returns the entry details if found. Use this to verify whether a specific
address or domain is currently allowed or blocked.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
direction: ListDirection = SchemaField(
description="'send' for outgoing rules, 'receive' for incoming rules"
)
list_type: ListType = SchemaField(
description="'allow' for whitelist, 'block' for blacklist"
)
entry: str = SchemaField(description="Email address or domain to look up")
class Output(BlockSchemaOutput):
entry: str = SchemaField(
description="The email address or domain that was found"
)
result: dict = SchemaField(description="Complete entry object with metadata")
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="fb117058-ab27-40d1-9231-eb1dd526fc7a",
description="Check if an email address or domain is in an allow/block list. Verify filtering rules.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
result = await client.lists.get(
input_data.direction.value,
input_data.list_type.value,
entry=input_data.entry,
)
result_dict = result.__dict__ if hasattr(result, "__dict__") else {}
yield "entry", input_data.entry
yield "result", result_dict
class AgentMailDeleteListEntryBlock(Block):
"""
Remove an email address or domain from an AgentMail allow/block list.
After removal, the address/domain will no longer be filtered by this list.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
direction: ListDirection = SchemaField(
description="'send' for outgoing rules, 'receive' for incoming rules"
)
list_type: ListType = SchemaField(
description="'allow' for whitelist, 'block' for blacklist"
)
entry: str = SchemaField(
description="Email address or domain to remove from the list"
)
class Output(BlockSchemaOutput):
success: bool = SchemaField(
description="True if the entry was successfully removed"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="2b8d57f1-1c9e-470f-a70b-5991c80fad5f",
description="Remove an email address or domain from an allow/block list to stop filtering it.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
await client.lists.delete(
input_data.direction.value,
input_data.list_type.value,
entry=input_data.entry,
)
yield "success", True

View File

@@ -0,0 +1,492 @@
"""
AgentMail Message blocks — send, list, get, reply, forward, and update messages.
A Message is an individual email within a Thread. Agents can send new messages
(which create threads), reply to existing messages, forward them, and manage
labels for state tracking (e.g. read/unread, campaign tags).
"""
from agentmail import AsyncAgentMail
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
CredentialsMetaInput,
SchemaField,
)
from ._config import agent_mail
def _client(credentials: APIKeyCredentials) -> AsyncAgentMail:
return AsyncAgentMail(api_key=credentials.api_key.get_secret_value())
class AgentMailSendMessageBlock(Block):
"""
Send a new email from an AgentMail inbox, automatically creating a new thread.
Supports plain text and HTML bodies, CC/BCC recipients, and labels for
organizing messages (e.g. campaign tracking, state management).
Max 50 combined recipients across to, cc, and bcc.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to send from (e.g. 'agent@agentmail.to')"
)
to: list[str] = SchemaField(
description="Recipient email addresses (e.g. ['user@example.com'])"
)
subject: str = SchemaField(description="Email subject line")
text: str = SchemaField(
description="Plain text body of the email. Always provide this as a fallback for email clients that don't render HTML."
)
html: str = SchemaField(
description="Rich HTML body of the email. Embed CSS in a <style> tag for best compatibility across email clients.",
default="",
advanced=True,
)
cc: list[str] = SchemaField(
description="CC recipient email addresses for human-in-the-loop oversight",
default_factory=list,
advanced=True,
)
bcc: list[str] = SchemaField(
description="BCC recipient email addresses (hidden from other recipients)",
default_factory=list,
advanced=True,
)
labels: list[str] = SchemaField(
description="Labels to tag the message for filtering and state management (e.g. ['outreach', 'q4-campaign'])",
default_factory=list,
advanced=True,
)
class Output(BlockSchemaOutput):
message_id: str = SchemaField(
description="Unique identifier of the sent message"
)
thread_id: str = SchemaField(
description="Thread ID grouping this message and any future replies"
)
result: dict = SchemaField(
description="Complete sent message object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="b67469b2-7748-4d81-a223-4ebd332cca89",
description="Send a new email from an AgentMail inbox. Creates a new conversation thread. Supports HTML, CC/BCC, and labels.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
total = len(input_data.to) + len(input_data.cc) + len(input_data.bcc)
if total > 50:
raise ValueError(
f"Max 50 combined recipients across to, cc, and bcc (got {total})"
)
client = _client(credentials)
params: dict = {
"to": input_data.to,
"subject": input_data.subject,
"text": input_data.text,
}
if input_data.html:
params["html"] = input_data.html
if input_data.cc:
params["cc"] = input_data.cc
if input_data.bcc:
params["bcc"] = input_data.bcc
if input_data.labels:
params["labels"] = input_data.labels
msg = await client.inboxes.messages.send(input_data.inbox_id, **params)
result = msg.__dict__ if hasattr(msg, "__dict__") else {}
yield "message_id", msg.message_id
yield "thread_id", getattr(msg, "thread_id", None) or ""
yield "result", result
class AgentMailListMessagesBlock(Block):
"""
List all messages in an AgentMail inbox with optional label filtering.
Returns a paginated list of messages. Use labels to filter (e.g.
labels=['unread'] to only get unprocessed messages). Useful for
polling workflows or building inbox views.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to list messages from"
)
limit: int = SchemaField(
description="Maximum number of messages to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
labels: list[str] = SchemaField(
description="Only return messages with ALL of these labels (e.g. ['unread'] or ['q4-campaign', 'follow-up'])",
default_factory=list,
advanced=True,
)
class Output(BlockSchemaOutput):
messages: list[dict] = SchemaField(
description="List of message objects with subject, sender, text, html, labels, etc."
)
count: int = SchemaField(description="Number of messages returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="721234df-c7a2-4927-b205-744badbd5844",
description="List messages in an AgentMail inbox. Filter by labels to find unread, campaign-tagged, or categorized messages.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
if input_data.labels:
params["labels"] = input_data.labels
response = await client.inboxes.messages.list(input_data.inbox_id, **params)
messages = [
m.__dict__ if hasattr(m, "__dict__") else m
for m in getattr(response, "messages", [])
]
yield "messages", messages
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(messages)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailGetMessageBlock(Block):
"""
Retrieve a specific email message by ID from an AgentMail inbox.
Returns the full message including subject, body (text and HTML),
sender, recipients, and attachments. Use extracted_text to get
only the new reply content without quoted history.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the message belongs to"
)
message_id: str = SchemaField(
description="Message ID to retrieve (e.g. '<abc123@agentmail.to>')"
)
class Output(BlockSchemaOutput):
message_id: str = SchemaField(description="Unique identifier of the message")
thread_id: str = SchemaField(description="Thread this message belongs to")
subject: str = SchemaField(description="Email subject line")
text: str = SchemaField(
description="Full plain text body (may include quoted reply history)"
)
extracted_text: str = SchemaField(
description="Just the new reply content with quoted history stripped. Best for AI processing.",
default="",
)
html: str = SchemaField(description="HTML body of the email", default="")
result: dict = SchemaField(
description="Complete message object with all fields including sender, recipients, attachments, labels"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="2788bdfa-1527-4603-a5e4-a455c05c032f",
description="Retrieve a specific email message by ID. Includes extracted_text for clean reply content without quoted history.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
msg = await client.inboxes.messages.get(
inbox_id=input_data.inbox_id,
message_id=input_data.message_id,
)
result = msg.__dict__ if hasattr(msg, "__dict__") else {}
yield "message_id", msg.message_id
yield "thread_id", getattr(msg, "thread_id", None) or ""
yield "subject", getattr(msg, "subject", None) or ""
yield "text", getattr(msg, "text", None) or ""
yield "extracted_text", getattr(msg, "extracted_text", None) or ""
yield "html", getattr(msg, "html", None) or ""
yield "result", result
class AgentMailReplyToMessageBlock(Block):
"""
Reply to an existing email message, keeping the reply in the same thread.
The reply is automatically added to the same conversation thread as the
original message. Use this for multi-turn agent conversations.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to send the reply from"
)
message_id: str = SchemaField(
description="Message ID to reply to (e.g. '<abc123@agentmail.to>')"
)
text: str = SchemaField(description="Plain text body of the reply")
html: str = SchemaField(
description="Rich HTML body of the reply",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
message_id: str = SchemaField(
description="Unique identifier of the reply message"
)
thread_id: str = SchemaField(description="Thread ID the reply was added to")
result: dict = SchemaField(
description="Complete reply message object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="b9fe53fa-5026-4547-9570-b54ccb487229",
description="Reply to an existing email in the same conversation thread. Use for multi-turn agent conversations.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"text": input_data.text}
if input_data.html:
params["html"] = input_data.html
reply = await client.inboxes.messages.reply(
inbox_id=input_data.inbox_id,
message_id=input_data.message_id,
**params,
)
result = reply.__dict__ if hasattr(reply, "__dict__") else {}
yield "message_id", reply.message_id
yield "thread_id", getattr(reply, "thread_id", None) or ""
yield "result", result
class AgentMailForwardMessageBlock(Block):
"""
Forward an existing email message to one or more recipients.
Sends the original message content to different email addresses.
Optionally prepend additional text or override the subject line.
Max 50 combined recipients across to, cc, and bcc.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to forward from"
)
message_id: str = SchemaField(description="Message ID to forward")
to: list[str] = SchemaField(
description="Recipient email addresses to forward the message to (e.g. ['user@example.com'])"
)
cc: list[str] = SchemaField(
description="CC recipient email addresses",
default_factory=list,
advanced=True,
)
bcc: list[str] = SchemaField(
description="BCC recipient email addresses (hidden from other recipients)",
default_factory=list,
advanced=True,
)
subject: str = SchemaField(
description="Override the subject line (defaults to 'Fwd: <original subject>')",
default="",
advanced=True,
)
text: str = SchemaField(
description="Additional plain text to prepend before the forwarded content",
default="",
advanced=True,
)
html: str = SchemaField(
description="Additional HTML to prepend before the forwarded content",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
message_id: str = SchemaField(
description="Unique identifier of the forwarded message"
)
thread_id: str = SchemaField(description="Thread ID of the forward")
result: dict = SchemaField(
description="Complete forwarded message object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="b70c7e33-5d66-4f8e-897f-ac73a7bfce82",
description="Forward an email message to one or more recipients. Supports CC/BCC and optional extra text or subject override.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
total = len(input_data.to) + len(input_data.cc) + len(input_data.bcc)
if total > 50:
raise ValueError(
f"Max 50 combined recipients across to, cc, and bcc (got {total})"
)
client = _client(credentials)
params: dict = {"to": input_data.to}
if input_data.cc:
params["cc"] = input_data.cc
if input_data.bcc:
params["bcc"] = input_data.bcc
if input_data.subject:
params["subject"] = input_data.subject
if input_data.text:
params["text"] = input_data.text
if input_data.html:
params["html"] = input_data.html
fwd = await client.inboxes.messages.forward(
inbox_id=input_data.inbox_id,
message_id=input_data.message_id,
**params,
)
result = fwd.__dict__ if hasattr(fwd, "__dict__") else {}
yield "message_id", fwd.message_id
yield "thread_id", getattr(fwd, "thread_id", None) or ""
yield "result", result
class AgentMailUpdateMessageBlock(Block):
"""
Add or remove labels on an email message for state management.
Labels are string tags used to track message state (read/unread),
categorize messages (billing, support), or tag campaigns (q4-outreach).
Common pattern: add 'read' and remove 'unread' after processing a message.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the message belongs to"
)
message_id: str = SchemaField(description="Message ID to update labels on")
add_labels: list[str] = SchemaField(
description="Labels to add (e.g. ['read', 'processed', 'high-priority'])",
default_factory=list,
)
remove_labels: list[str] = SchemaField(
description="Labels to remove (e.g. ['unread', 'pending'])",
default_factory=list,
)
class Output(BlockSchemaOutput):
message_id: str = SchemaField(description="The updated message ID")
result: dict = SchemaField(
description="Complete updated message object with current labels"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="694ff816-4c89-4a5e-a552-8c31be187735",
description="Add or remove labels on an email message. Use for read/unread tracking, campaign tagging, or state management.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {}
if input_data.add_labels:
params["add_labels"] = input_data.add_labels
if input_data.remove_labels:
params["remove_labels"] = input_data.remove_labels
msg = await client.inboxes.messages.update(
inbox_id=input_data.inbox_id,
message_id=input_data.message_id,
**params,
)
result = msg.__dict__ if hasattr(msg, "__dict__") else {}
yield "message_id", msg.message_id
yield "result", result

View File

@@ -0,0 +1,486 @@
"""
AgentMail Pod blocks — create, get, list, delete pods and list pod-scoped resources.
Pods provide multi-tenant isolation between your customers. Each pod acts as
an isolated workspace containing its own inboxes, domains, threads, and drafts.
Use pods when building SaaS platforms, agency tools, or AI agent fleets that
serve multiple customers.
"""
from agentmail import AsyncAgentMail
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
CredentialsMetaInput,
SchemaField,
)
from ._config import agent_mail
def _client(credentials: APIKeyCredentials) -> AsyncAgentMail:
return AsyncAgentMail(api_key=credentials.api_key.get_secret_value())
class AgentMailCreatePodBlock(Block):
"""
Create a new pod for multi-tenant customer isolation.
Each pod acts as an isolated workspace for one customer or tenant.
Use client_id to map pods to your internal tenant IDs for idempotent
creation (safe to retry without creating duplicates).
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
client_id: str = SchemaField(
description="Your internal tenant/customer ID for idempotent mapping. Lets you access the pod by your own ID instead of AgentMail's pod_id.",
default="",
)
class Output(BlockSchemaOutput):
pod_id: str = SchemaField(description="Unique identifier of the created pod")
result: dict = SchemaField(description="Complete pod object with all metadata")
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="a2db9784-2d17-4f8f-9d6b-0214e6f22101",
description="Create a new pod for multi-tenant customer isolation. Use client_id to map to your internal tenant IDs.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {}
if input_data.client_id:
params["client_id"] = input_data.client_id
pod = await client.pods.create(**params)
result = pod.__dict__ if hasattr(pod, "__dict__") else {}
yield "pod_id", pod.pod_id
yield "result", result
class AgentMailGetPodBlock(Block):
"""
Retrieve details of an existing pod by its ID.
Returns the pod metadata including its client_id mapping and
creation timestamp.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
pod_id: str = SchemaField(description="Pod ID to retrieve")
class Output(BlockSchemaOutput):
pod_id: str = SchemaField(description="Unique identifier of the pod")
result: dict = SchemaField(description="Complete pod object with all metadata")
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="553361bc-bb1b-4322-9ad4-0c226200217e",
description="Retrieve details of an existing pod including its client_id mapping and metadata.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
pod = await client.pods.get(pod_id=input_data.pod_id)
result = pod.__dict__ if hasattr(pod, "__dict__") else {}
yield "pod_id", pod.pod_id
yield "result", result
class AgentMailListPodsBlock(Block):
"""
List all pods in your AgentMail organization.
Returns a paginated list of all tenant pods with their metadata.
Use this to see all customer workspaces at a glance.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
limit: int = SchemaField(
description="Maximum number of pods to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
pods: list[dict] = SchemaField(
description="List of pod objects with pod_id, client_id, creation time, etc."
)
count: int = SchemaField(description="Number of pods returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="9d3725ee-2968-431a-a816-857ab41e1420",
description="List all tenant pods in your organization. See all customer workspaces at a glance.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
response = await client.pods.list(**params)
pods = [
p.__dict__ if hasattr(p, "__dict__") else p
for p in getattr(response, "pods", [])
]
yield "pods", pods
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(pods)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailDeletePodBlock(Block):
"""
Permanently delete a pod. All inboxes and domains must be removed first.
You cannot delete a pod that still contains inboxes or domains.
Delete all child resources first, then delete the pod.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
pod_id: str = SchemaField(
description="Pod ID to permanently delete (must have no inboxes or domains)"
)
class Output(BlockSchemaOutput):
success: bool = SchemaField(
description="True if the pod was successfully deleted"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="f371f8cd-682d-4f5f-905c-529c74a8fb35",
description="Permanently delete a pod. All inboxes and domains must be removed first.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
await client.pods.delete(pod_id=input_data.pod_id)
yield "success", True
class AgentMailListPodInboxesBlock(Block):
"""
List all inboxes within a specific pod (customer workspace).
Returns only the inboxes belonging to this pod, providing
tenant-scoped visibility.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
pod_id: str = SchemaField(description="Pod ID to list inboxes from")
limit: int = SchemaField(
description="Maximum number of inboxes to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
inboxes: list[dict] = SchemaField(
description="List of inbox objects within this pod"
)
count: int = SchemaField(description="Number of inboxes returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="a8c17ce0-b7c1-4bc3-ae39-680e1952e5d0",
description="List all inboxes within a pod. View email accounts scoped to a specific customer.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
response = await client.pods.inboxes.list(pod_id=input_data.pod_id, **params)
inboxes = [
i.__dict__ if hasattr(i, "__dict__") else i
for i in getattr(response, "inboxes", [])
]
yield "inboxes", inboxes
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(inboxes)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailListPodThreadsBlock(Block):
"""
List all conversation threads across all inboxes within a pod.
Returns threads from every inbox in the pod. Use for building
per-customer dashboards showing all email activity, or for
supervisor agents monitoring a customer's conversations.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
pod_id: str = SchemaField(description="Pod ID to list threads from")
limit: int = SchemaField(
description="Maximum number of threads to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
labels: list[str] = SchemaField(
description="Only return threads matching ALL of these labels",
default_factory=list,
advanced=True,
)
class Output(BlockSchemaOutput):
threads: list[dict] = SchemaField(
description="List of thread objects from all inboxes in this pod"
)
count: int = SchemaField(description="Number of threads returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="80214f08-8b85-4533-a6b8-f8123bfcb410",
description="List all conversation threads across all inboxes within a pod. View all email activity for a customer.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
if input_data.labels:
params["labels"] = input_data.labels
response = await client.pods.threads.list(pod_id=input_data.pod_id, **params)
threads = [
t.__dict__ if hasattr(t, "__dict__") else t
for t in getattr(response, "threads", [])
]
yield "threads", threads
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(threads)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailListPodDraftsBlock(Block):
"""
List all drafts across all inboxes within a pod.
Returns pending drafts from every inbox in the pod. Use for
per-customer approval dashboards or monitoring scheduled sends.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
pod_id: str = SchemaField(description="Pod ID to list drafts from")
limit: int = SchemaField(
description="Maximum number of drafts to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
class Output(BlockSchemaOutput):
drafts: list[dict] = SchemaField(
description="List of draft objects from all inboxes in this pod"
)
count: int = SchemaField(description="Number of drafts returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="12fd7a3e-51ad-4b20-97c1-0391f207f517",
description="List all drafts across all inboxes within a pod. View pending emails for a customer.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
response = await client.pods.drafts.list(pod_id=input_data.pod_id, **params)
drafts = [
d.__dict__ if hasattr(d, "__dict__") else d
for d in getattr(response, "drafts", [])
]
yield "drafts", drafts
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(drafts)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailCreatePodInboxBlock(Block):
"""
Create a new email inbox within a specific pod (customer workspace).
The inbox is automatically scoped to the pod and inherits its
isolation guarantees. If username/domain are not provided,
AgentMail auto-generates a unique address.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
pod_id: str = SchemaField(description="Pod ID to create the inbox in")
username: str = SchemaField(
description="Local part of the email address (e.g. 'support'). Leave empty to auto-generate.",
default="",
)
domain: str = SchemaField(
description="Email domain (e.g. 'mydomain.com'). Defaults to agentmail.to if empty.",
default="",
)
display_name: str = SchemaField(
description="Friendly name shown in the 'From' field (e.g. 'Customer Support')",
default="",
)
class Output(BlockSchemaOutput):
inbox_id: str = SchemaField(
description="Unique identifier of the created inbox"
)
email_address: str = SchemaField(description="Full email address of the inbox")
result: dict = SchemaField(
description="Complete inbox object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="c6862373-1ac6-402e-89e6-7db1fea882af",
description="Create a new email inbox within a pod. The inbox is scoped to the customer workspace.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {}
if input_data.username:
params["username"] = input_data.username
if input_data.domain:
params["domain"] = input_data.domain
if input_data.display_name:
params["display_name"] = input_data.display_name
inbox = await client.pods.inboxes.create(pod_id=input_data.pod_id, **params)
result = inbox.__dict__ if hasattr(inbox, "__dict__") else {}
yield "inbox_id", inbox.inbox_id
yield "email_address", getattr(inbox, "email_address", inbox.inbox_id)
yield "result", result

View File

@@ -0,0 +1,329 @@
"""
AgentMail Thread blocks — list, get, and delete conversation threads.
A Thread groups related messages into a single conversation. Threads are
created automatically when a new message is sent and grow as replies are added.
Threads can be queried per-inbox or across the entire organization.
"""
from agentmail import AsyncAgentMail
from backend.sdk import (
APIKeyCredentials,
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
CredentialsMetaInput,
SchemaField,
)
from ._config import agent_mail
def _client(credentials: APIKeyCredentials) -> AsyncAgentMail:
return AsyncAgentMail(api_key=credentials.api_key.get_secret_value())
class AgentMailListInboxThreadsBlock(Block):
"""
List all conversation threads within a specific AgentMail inbox.
Returns a paginated list of threads with optional label filtering.
Use labels to find threads by campaign, status, or custom tags.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address to list threads from"
)
limit: int = SchemaField(
description="Maximum number of threads to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
labels: list[str] = SchemaField(
description="Only return threads matching ALL of these labels (e.g. ['q4-campaign', 'follow-up'])",
default_factory=list,
advanced=True,
)
class Output(BlockSchemaOutput):
threads: list[dict] = SchemaField(
description="List of thread objects with thread_id, subject, message count, labels, etc."
)
count: int = SchemaField(description="Number of threads returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="63dd9e2d-ef81-405c-b034-c031f0437334",
description="List all conversation threads in an AgentMail inbox. Filter by labels for campaign tracking or status management.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
if input_data.labels:
params["labels"] = input_data.labels
response = await client.inboxes.threads.list(
inbox_id=input_data.inbox_id, **params
)
threads = [
t.__dict__ if hasattr(t, "__dict__") else t
for t in getattr(response, "threads", [])
]
yield "threads", threads
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(threads)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailGetInboxThreadBlock(Block):
"""
Retrieve a single conversation thread from an AgentMail inbox.
Returns the thread with all its messages in chronological order.
Use this to get the full conversation history for context when
composing replies.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the thread belongs to"
)
thread_id: str = SchemaField(description="Thread ID to retrieve")
class Output(BlockSchemaOutput):
thread_id: str = SchemaField(description="Unique identifier of the thread")
messages: list[dict] = SchemaField(
description="All messages in the thread, in chronological order"
)
result: dict = SchemaField(
description="Complete thread object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="42866290-1479-4153-83e7-550b703e9da2",
description="Retrieve a conversation thread with all its messages. Use for getting full conversation context before replying.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
thread = await client.inboxes.threads.get(
inbox_id=input_data.inbox_id,
thread_id=input_data.thread_id,
)
messages = [
m.__dict__ if hasattr(m, "__dict__") else m
for m in getattr(thread, "messages", [])
]
result = dict(thread.__dict__) if hasattr(thread, "__dict__") else {}
if "messages" in result:
result["messages"] = messages
yield "thread_id", thread.thread_id
yield "messages", messages
yield "result", result
class AgentMailDeleteInboxThreadBlock(Block):
"""
Permanently delete a conversation thread and all its messages from an inbox.
This removes the thread and every message within it. This action
cannot be undone.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
inbox_id: str = SchemaField(
description="Inbox ID or email address the thread belongs to"
)
thread_id: str = SchemaField(description="Thread ID to permanently delete")
class Output(BlockSchemaOutput):
success: bool = SchemaField(
description="True if the thread was successfully deleted"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="18cd5f6f-4ff6-45da-8300-25a50ea7fb75",
description="Permanently delete a conversation thread and all its messages. This action cannot be undone.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
is_sensitive_action=True,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
await client.inboxes.threads.delete(
inbox_id=input_data.inbox_id,
thread_id=input_data.thread_id,
)
yield "success", True
class AgentMailListOrgThreadsBlock(Block):
"""
List conversation threads across ALL inboxes in your organization.
Unlike per-inbox listing, this returns threads from every inbox.
Ideal for building supervisor agents that monitor all conversations,
analytics dashboards, or cross-agent routing workflows.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
limit: int = SchemaField(
description="Maximum number of threads to return per page (1-100)",
default=20,
advanced=True,
)
page_token: str = SchemaField(
description="Token from a previous response to fetch the next page",
default="",
advanced=True,
)
labels: list[str] = SchemaField(
description="Only return threads matching ALL of these labels",
default_factory=list,
advanced=True,
)
class Output(BlockSchemaOutput):
threads: list[dict] = SchemaField(
description="List of thread objects from all inboxes in the organization"
)
count: int = SchemaField(description="Number of threads returned")
next_page_token: str = SchemaField(
description="Token for the next page. Empty if no more results.",
default="",
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="d7a0657b-58ab-48b2-898b-7bd94f44a708",
description="List threads across ALL inboxes in your organization. Use for supervisor agents, dashboards, or cross-agent monitoring.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
params: dict = {"limit": input_data.limit}
if input_data.page_token:
params["page_token"] = input_data.page_token
if input_data.labels:
params["labels"] = input_data.labels
response = await client.threads.list(**params)
threads = [
t.__dict__ if hasattr(t, "__dict__") else t
for t in getattr(response, "threads", [])
]
yield "threads", threads
yield "count", (
c if (c := getattr(response, "count", None)) is not None else len(threads)
)
yield "next_page_token", getattr(response, "next_page_token", "") or ""
class AgentMailGetOrgThreadBlock(Block):
"""
Retrieve a single conversation thread by ID from anywhere in the organization.
Works without needing to know which inbox the thread belongs to.
Returns the thread with all its messages in chronological order.
"""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput = agent_mail.credentials_field(
description="AgentMail API key from https://console.agentmail.to"
)
thread_id: str = SchemaField(
description="Thread ID to retrieve (works across all inboxes)"
)
class Output(BlockSchemaOutput):
thread_id: str = SchemaField(description="Unique identifier of the thread")
messages: list[dict] = SchemaField(
description="All messages in the thread, in chronological order"
)
result: dict = SchemaField(
description="Complete thread object with all metadata"
)
error: str = SchemaField(description="Error message if the operation failed")
def __init__(self):
super().__init__(
id="39aaae31-3eb1-44c6-9e37-5a44a4529649",
description="Retrieve a conversation thread by ID from anywhere in the organization, without needing the inbox ID.",
categories={BlockCategory.COMMUNICATION},
input_schema=self.Input,
output_schema=self.Output,
)
async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
client = _client(credentials)
thread = await client.threads.get(thread_id=input_data.thread_id)
messages = [
m.__dict__ if hasattr(m, "__dict__") else m
for m in getattr(thread, "messages", [])
]
result = dict(thread.__dict__) if hasattr(thread, "__dict__") else {}
if "messages" in result:
result["messages"] = messages
yield "thread_id", thread.thread_id
yield "messages", messages
yield "result", result

View File

@@ -183,8 +183,7 @@ class WorkspaceManager:
f"{Config().max_file_size_mb}MB limit"
)
# Scan here — callers must NOT duplicate this scan.
# WorkspaceManager owns virus scanning for all persisted files.
# Virus scan content before persisting (defense in depth)
await scan_content_safe(content, filename=filename)
# Determine path with session scoping

View File

@@ -1,5 +1,24 @@
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "agentmail"
version = "0.4.7"
description = ""
optional = false
python-versions = "<4.0,>=3.8"
groups = ["main"]
files = [
{file = "agentmail-0.4.7-py3-none-any.whl", hash = "sha256:059fcd1bb6ed0d9c8aea33419e0592d2a1d7fce810c9756b43e7fb40da484eb0"},
{file = "agentmail-0.4.7.tar.gz", hash = "sha256:5648dffb5bedd8f165d633271873de61362d5a76e8b5256e8916ecdd859539b9"},
]
[package.dependencies]
httpx = ">=0.21.2"
pydantic = ">=1.9.2"
pydantic-core = ">=2.18.2"
typing_extensions = ">=4.0.0"
websockets = ">=12.0"
[[package]]
name = "aio-pika"
version = "9.5.8"
@@ -8969,4 +8988,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.14"
content-hash = "86dab25684dd46e635a33bd33281a926e5626a874ecc048c34389fecf34a87d8"
content-hash = "b5888ff2e8bf296540994d9a620d67bd2a4a50b418c2159fa38f0b7af1e7b538"

View File

@@ -12,6 +12,7 @@ python = ">=3.10,<3.14"
aio-pika = "^9.5.5"
aiohttp = "^3.10.0"
aiodns = "^3.5.0"
agentmail = "^0.4.5"
anthropic = "^0.79.0"
apscheduler = "^3.11.1"
autogpt-libs = { path = "../autogpt_libs", develop = true }

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

View File

@@ -418,6 +418,43 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| Block Name | Description |
|------------|-------------|
| [Agent Mail Create Draft](block-integrations/agent_mail/drafts.md#agent-mail-create-draft) | Create a draft email for review or scheduled sending |
| [Agent Mail Create Inbox](block-integrations/agent_mail/inbox.md#agent-mail-create-inbox) | Create a new email inbox for an AI agent via AgentMail |
| [Agent Mail Create List Entry](block-integrations/agent_mail/lists.md#agent-mail-create-list-entry) | Add an email address or domain to an allow/block list |
| [Agent Mail Create Pod](block-integrations/agent_mail/pods.md#agent-mail-create-pod) | Create a new pod for multi-tenant customer isolation |
| [Agent Mail Create Pod Inbox](block-integrations/agent_mail/pods.md#agent-mail-create-pod-inbox) | Create a new email inbox within a pod |
| [Agent Mail Delete Draft](block-integrations/agent_mail/drafts.md#agent-mail-delete-draft) | Delete a draft or cancel a scheduled email |
| [Agent Mail Delete Inbox](block-integrations/agent_mail/inbox.md#agent-mail-delete-inbox) | Permanently delete an AgentMail inbox and all its messages, threads, and drafts |
| [Agent Mail Delete Inbox Thread](block-integrations/agent_mail/threads.md#agent-mail-delete-inbox-thread) | Permanently delete a conversation thread and all its messages |
| [Agent Mail Delete List Entry](block-integrations/agent_mail/lists.md#agent-mail-delete-list-entry) | Remove an email address or domain from an allow/block list to stop filtering it |
| [Agent Mail Delete Pod](block-integrations/agent_mail/pods.md#agent-mail-delete-pod) | Permanently delete a pod |
| [Agent Mail Forward Message](block-integrations/agent_mail/messages.md#agent-mail-forward-message) | Forward an email message to one or more recipients |
| [Agent Mail Get Draft](block-integrations/agent_mail/drafts.md#agent-mail-get-draft) | Retrieve a draft email to review its contents, recipients, and scheduled send status |
| [Agent Mail Get Inbox](block-integrations/agent_mail/inbox.md#agent-mail-get-inbox) | Retrieve details of an existing AgentMail inbox including its email address, display name, and configuration |
| [Agent Mail Get Inbox Thread](block-integrations/agent_mail/threads.md#agent-mail-get-inbox-thread) | Retrieve a conversation thread with all its messages |
| [Agent Mail Get List Entry](block-integrations/agent_mail/lists.md#agent-mail-get-list-entry) | Check if an email address or domain is in an allow/block list |
| [Agent Mail Get Message](block-integrations/agent_mail/messages.md#agent-mail-get-message) | Retrieve a specific email message by ID |
| [Agent Mail Get Message Attachment](block-integrations/agent_mail/attachments.md#agent-mail-get-message-attachment) | Download a file attachment from an email message |
| [Agent Mail Get Org Thread](block-integrations/agent_mail/threads.md#agent-mail-get-org-thread) | Retrieve a conversation thread by ID from anywhere in the organization, without needing the inbox ID |
| [Agent Mail Get Pod](block-integrations/agent_mail/pods.md#agent-mail-get-pod) | Retrieve details of an existing pod including its client_id mapping and metadata |
| [Agent Mail Get Thread Attachment](block-integrations/agent_mail/attachments.md#agent-mail-get-thread-attachment) | Download a file attachment from a conversation thread |
| [Agent Mail List Drafts](block-integrations/agent_mail/drafts.md#agent-mail-list-drafts) | List drafts in an AgentMail inbox |
| [Agent Mail List Entries](block-integrations/agent_mail/lists.md#agent-mail-list-entries) | List all entries in an AgentMail allow/block list |
| [Agent Mail List Inbox Threads](block-integrations/agent_mail/threads.md#agent-mail-list-inbox-threads) | List all conversation threads in an AgentMail inbox |
| [Agent Mail List Inboxes](block-integrations/agent_mail/inbox.md#agent-mail-list-inboxes) | List all email inboxes in your AgentMail organization with pagination support |
| [Agent Mail List Messages](block-integrations/agent_mail/messages.md#agent-mail-list-messages) | List messages in an AgentMail inbox |
| [Agent Mail List Org Drafts](block-integrations/agent_mail/drafts.md#agent-mail-list-org-drafts) | List all drafts across every inbox in your organization |
| [Agent Mail List Org Threads](block-integrations/agent_mail/threads.md#agent-mail-list-org-threads) | List threads across ALL inboxes in your organization |
| [Agent Mail List Pod Drafts](block-integrations/agent_mail/pods.md#agent-mail-list-pod-drafts) | List all drafts across all inboxes within a pod |
| [Agent Mail List Pod Inboxes](block-integrations/agent_mail/pods.md#agent-mail-list-pod-inboxes) | List all inboxes within a pod |
| [Agent Mail List Pod Threads](block-integrations/agent_mail/pods.md#agent-mail-list-pod-threads) | List all conversation threads across all inboxes within a pod |
| [Agent Mail List Pods](block-integrations/agent_mail/pods.md#agent-mail-list-pods) | List all tenant pods in your organization |
| [Agent Mail Reply To Message](block-integrations/agent_mail/messages.md#agent-mail-reply-to-message) | Reply to an existing email in the same conversation thread |
| [Agent Mail Send Draft](block-integrations/agent_mail/drafts.md#agent-mail-send-draft) | Send a draft immediately, converting it into a delivered message |
| [Agent Mail Send Message](block-integrations/agent_mail/messages.md#agent-mail-send-message) | Send a new email from an AgentMail inbox |
| [Agent Mail Update Draft](block-integrations/agent_mail/drafts.md#agent-mail-update-draft) | Update a draft's content, recipients, or scheduled send time |
| [Agent Mail Update Inbox](block-integrations/agent_mail/inbox.md#agent-mail-update-inbox) | Update the display name of an AgentMail inbox |
| [Agent Mail Update Message](block-integrations/agent_mail/messages.md#agent-mail-update-message) | Add or remove labels on an email message |
| [Baas Bot Join Meeting](block-integrations/baas/bots.md#baas-bot-join-meeting) | Deploy a bot to join and record a meeting |
| [Baas Bot Leave Meeting](block-integrations/baas/bots.md#baas-bot-leave-meeting) | Remove a bot from an ongoing meeting |
| [Gmail Add Label](block-integrations/google/gmail.md#gmail-add-label) | A block that adds a label to a specific email message in Gmail, creating the label if it doesn't exist |

View File

@@ -9,6 +9,13 @@
## Block Integrations
* [Agent Mail Attachments](block-integrations/agent_mail/attachments.md)
* [Agent Mail Drafts](block-integrations/agent_mail/drafts.md)
* [Agent Mail Inbox](block-integrations/agent_mail/inbox.md)
* [Agent Mail Lists](block-integrations/agent_mail/lists.md)
* [Agent Mail Messages](block-integrations/agent_mail/messages.md)
* [Agent Mail Pods](block-integrations/agent_mail/pods.md)
* [Agent Mail Threads](block-integrations/agent_mail/threads.md)
* [Airtable Bases](block-integrations/airtable/bases.md)
* [Airtable Records](block-integrations/airtable/records.md)
* [Airtable Schema](block-integrations/airtable/schema.md)

View File

@@ -0,0 +1,78 @@
# Agent Mail Attachments
<!-- MANUAL: file_description -->
Blocks for downloading file attachments from AgentMail messages and threads. Attachments are files associated with messages (PDFs, CSVs, images, etc.) and are returned as base64-encoded content.
<!-- END MANUAL -->
## Agent Mail Get Message Attachment
### What it is
Download a file attachment from an email message. Returns base64-encoded file content.
### How it works
<!-- MANUAL: how_it_works -->
The block calls the AgentMail API's `inboxes.messages.get_attachment` endpoint using the provided inbox ID, message ID, and attachment ID. The API returns the raw file content, which may arrive as `bytes` or `str` depending on the attachment type. The block base64-encodes the result: binary data is encoded directly, while string data is first UTF-8 encoded then base64-encoded. If the API returns an unexpected data type, the block raises a `TypeError`.
On any failure — invalid IDs, network errors, authentication issues, or unexpected response types — the block catches the exception and yields the error message on the `error` output instead of `content_base64`. No partial results are returned; the block either yields both `content_base64` and `attachment_id` on success, or only `error` on failure.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the message belongs to | str | Yes |
| message_id | Message ID containing the attachment | str | Yes |
| attachment_id | Attachment ID to download (from the message's attachments array) | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| content_base64 | File content encoded as a base64 string. Decode with base64.b64decode() to get raw bytes. | str |
| attachment_id | The attachment ID that was downloaded | str |
### Possible use case
<!-- MANUAL: use_case -->
**Invoice Processing Pipeline** — Download PDF invoices from incoming messages and feed them into a parsing block that extracts line items and totals.
**Automated Attachment Archival** — Pull attachments from specific senders and store the base64 content in a database or cloud bucket for long-term retention.
**Image Analysis Workflow** — Retrieve image attachments from support emails and pass them to a vision model block for classification or OCR.
<!-- END MANUAL -->
---
## Agent Mail Get Thread Attachment
### What it is
Download a file attachment from a conversation thread. Returns base64-encoded file content.
### How it works
<!-- MANUAL: how_it_works -->
The block calls the AgentMail API's `inboxes.threads.get_attachment` endpoint using the provided inbox ID, thread ID, and attachment ID. This is functionally identical to the message attachment block but resolves the attachment via a thread rather than a specific message — useful when you have the thread context but not the individual message ID. The raw response is base64-encoded the same way: `bytes` are encoded directly, `str` is UTF-8 encoded first, and any other response type triggers a `TypeError`.
Error handling follows the same all-or-nothing pattern: on success the block yields `content_base64` and `attachment_id`; on any exception (bad IDs, auth failure, network error, unexpected type) it yields only `error` with the exception message.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the thread belongs to | str | Yes |
| thread_id | Thread ID containing the attachment | str | Yes |
| attachment_id | Attachment ID to download (from a message's attachments array within the thread) | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| content_base64 | File content encoded as a base64 string. Decode with base64.b64decode() to get raw bytes. | str |
| attachment_id | The attachment ID that was downloaded | str |
### Possible use case
<!-- MANUAL: use_case -->
**Conversation Attachment Collector** — Iterate over a support thread and download every attachment to build a complete case file for review.
**Threaded Report Extraction** — Pull CSV or Excel attachments from recurring report threads and forward them to a data-processing block.
**Compliance Document Retrieval** — Download signed-document attachments from legal threads and pass them to a verification or archival workflow.
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,291 @@
# Agent Mail Drafts
<!-- MANUAL: file_description -->
Blocks for creating, reviewing, editing, sending, and deleting email drafts in AgentMail. Drafts enable human-in-the-loop review, scheduled sending, and multi-step email composition workflows.
<!-- END MANUAL -->
## Agent Mail Create Draft
### What it is
Create a draft email for review or scheduled sending. Use send_at for automatic future delivery.
### How it works
<!-- MANUAL: how_it_works -->
The block calls `client.inboxes.drafts.create(inbox_id, **params)` to create a new draft in the specified inbox. You must provide at least the recipient list; subject, text body, HTML body, cc, bcc, and in_reply_to are all optional.
If you supply `send_at`, the draft is scheduled for automatic delivery at that time and the returned `send_status` will be `scheduled`. If `send_at` is omitted, the draft remains unsent until you explicitly send it with the Send Draft block. Any errors propagate to the block framework's global error handler, which yields them on the error output.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to create the draft in | str | Yes |
| to | Recipient email addresses (e.g. ['user@example.com']) | List[str] | Yes |
| subject | Email subject line | str | No |
| text | Plain text body of the draft | str | No |
| html | Rich HTML body of the draft | str | No |
| cc | CC recipient email addresses | List[str] | No |
| bcc | BCC recipient email addresses | List[str] | No |
| in_reply_to | Message ID this draft replies to, for threading follow-up drafts | str | No |
| send_at | Schedule automatic sending at this ISO 8601 datetime (e.g. '2025-01-15T09:00:00Z'). Leave empty for manual send. | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| draft_id | Unique identifier of the created draft | str |
| send_status | 'scheduled' if send_at was set, empty otherwise. Values: scheduled, sending, failed. | str |
| result | Complete draft object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
**Human-in-the-Loop Review** — Create a draft so a human can review and approve the email before it is sent.
**Scheduled Outreach** — Set `send_at` to queue a follow-up email that delivers automatically at a future date and time.
**Multi-Step Composition** — Create a draft with initial content, then use the Update Draft block to refine recipients or body text in later workflow steps.
<!-- END MANUAL -->
---
## Agent Mail Delete Draft
### What it is
Delete a draft or cancel a scheduled email. Removes the draft permanently.
### How it works
<!-- MANUAL: how_it_works -->
The block calls `client.inboxes.drafts.delete(inbox_id, draft_id)` to permanently remove the specified draft. If the draft was scheduled for future delivery, deleting it also cancels that scheduled send.
On success the block yields `success=True`. Any errors propagate to the block framework's global error handler, which yields them on the error output.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the draft belongs to | str | Yes |
| draft_id | Draft ID to delete (also cancels scheduled sends) | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| success | True if the draft was successfully deleted/cancelled | bool |
### Possible use case
<!-- MANUAL: use_case -->
**Cancel Scheduled Send** — Delete a draft that was scheduled with `send_at` to prevent it from being delivered.
**Clean Up Rejected Drafts** — Remove drafts that a human reviewer has declined during an approval workflow.
**Abort Workflow** — Delete an in-progress draft when upstream conditions change and the email is no longer needed.
<!-- END MANUAL -->
---
## Agent Mail Get Draft
### What it is
Retrieve a draft email to review its contents, recipients, and scheduled send status.
### How it works
<!-- MANUAL: how_it_works -->
The block calls `client.inboxes.drafts.get(inbox_id, draft_id)` to fetch a single draft by its ID. It returns the draft's subject, send status, scheduled send time, and the complete draft object.
Any errors propagate to the block framework's global error handler, which yields them on the error output.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the draft belongs to | str | Yes |
| draft_id | Draft ID to retrieve | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| draft_id | Unique identifier of the draft | str |
| subject | Draft subject line | str |
| send_status | Scheduled send status: 'scheduled', 'sending', 'failed', or empty | str |
| send_at | Scheduled send time (ISO 8601) if set | str |
| result | Complete draft object with all fields | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
**Approval Gate** — Fetch a draft so a human reviewer can inspect its content and recipients before approving it for send.
**Schedule Monitoring** — Retrieve a scheduled draft to check its `send_status` and confirm it is still queued for delivery.
**Content Verification** — Read back a draft after creation or update to verify that the subject and body match expectations before proceeding.
<!-- END MANUAL -->
---
## Agent Mail List Drafts
### What it is
List drafts in an AgentMail inbox. Filter by labels=['scheduled'] to find pending sends.
### How it works
<!-- MANUAL: how_it_works -->
The block calls `client.inboxes.drafts.list(inbox_id, **params)` to retrieve drafts from a single inbox. You can control page size with `limit`, paginate with `page_token`, and filter by `labels` (for example, `['scheduled']` to find only pending sends).
The block returns the list of draft objects, a count of drafts in the current page, and a `next_page_token` for fetching subsequent pages. Any errors propagate to the block framework's global error handler, which yields them on the error output.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to list drafts from | str | Yes |
| limit | Maximum number of drafts to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
| labels | Filter drafts by labels (e.g. ['scheduled'] for pending sends) | List[str] | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| drafts | List of draft objects with subject, recipients, send_status, etc. | List[Dict[str, Any]] |
| count | Number of drafts returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
**Inbox Dashboard** — List all drafts in an inbox to display a queue of pending emails awaiting review or approval.
**Scheduled Send Audit** — Filter by `labels=['scheduled']` to surface every draft that is queued for future delivery and verify send times.
**Batch Processing** — Paginate through all drafts in an inbox to perform bulk updates or deletions in a cleanup workflow.
<!-- END MANUAL -->
---
## Agent Mail List Org Drafts
### What it is
List all drafts across every inbox in your organization. Use for central approval dashboards.
### How it works
<!-- MANUAL: how_it_works -->
The block calls `client.drafts.list(**params)` at the organization level, so it returns drafts across every inbox without requiring an inbox ID. You can control page size with `limit` and paginate with `page_token`.
The block returns the list of draft objects, a count of drafts in the current page, and a `next_page_token` for fetching subsequent pages. Any errors propagate to the block framework's global error handler, which yields them on the error output.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| limit | Maximum number of drafts to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| drafts | List of draft objects from all inboxes in the organization | List[Dict[str, Any]] |
| count | Number of drafts returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
**Central Approval Dashboard** — List every pending draft across all inboxes so a manager can review and approve outbound emails in one place.
**Organization-Wide Analytics** — Count and categorize drafts across inboxes to report on email pipeline volume and bottlenecks.
**Stale Draft Cleanup** — Paginate through all org drafts to identify and delete old or abandoned drafts that were never sent.
<!-- END MANUAL -->
---
## Agent Mail Send Draft
### What it is
Send a draft immediately, converting it into a delivered message. The draft is deleted after sending.
### How it works
<!-- MANUAL: how_it_works -->
The block calls `client.inboxes.drafts.send(inbox_id, draft_id)` to deliver the draft immediately. Once the send completes, the draft is deleted from the inbox and a message object is returned in its place.
The block yields the `message_id` and `thread_id` of the newly sent email along with the complete message result. Any errors propagate to the block framework's global error handler, which yields them on the error output.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the draft belongs to | str | Yes |
| draft_id | Draft ID to send now | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | Message ID of the now-sent email (draft is deleted) | str |
| thread_id | Thread ID the sent message belongs to | str |
| result | Complete sent message object | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
**Post-Approval Dispatch** — Send a draft immediately after a human reviewer approves it in a review workflow.
**On-Demand Notifications** — Compose a draft earlier in the workflow and send it only when a triggering event occurs.
**Retry After Edit** — After updating a draft that failed validation, send it again without recreating it from scratch.
<!-- END MANUAL -->
---
## Agent Mail Update Draft
### What it is
Update a draft's content, recipients, or scheduled send time. Use to reschedule or edit before sending.
### How it works
<!-- MANUAL: how_it_works -->
The block calls `client.inboxes.drafts.update(inbox_id, draft_id, **params)` to modify an existing draft. Only the fields you provide are changed; omitted fields are left untouched. Internally, `None` is used to distinguish between "omit this field" and "clear this field to empty," so you can selectively update recipients, subject, body, or scheduled send time without affecting other fields.
The block returns the updated `draft_id`, `send_status`, and the complete draft result. Any errors propagate to the block framework's global error handler, which yields them on the error output.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the draft belongs to | str | Yes |
| draft_id | Draft ID to update | str | Yes |
| to | Updated recipient email addresses (replaces existing list). Omit to keep current value. | List[str] | No |
| subject | Updated subject line. Omit to keep current value. | str | No |
| text | Updated plain text body. Omit to keep current value. | str | No |
| html | Updated HTML body. Omit to keep current value. | str | No |
| send_at | Reschedule: new ISO 8601 send time (e.g. '2025-01-20T14:00:00Z'). Omit to keep current value. | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| draft_id | The updated draft ID | str |
| send_status | Updated send status | str |
| result | Complete updated draft object | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
**Reschedule Delivery** — Change the `send_at` time on a scheduled draft to delay or advance its delivery window.
**Reviewer Edits** — Allow a human reviewer to modify the subject or body of a draft before it is approved and sent.
**Dynamic Recipient Updates** — Update the recipient list based on data gathered in earlier workflow steps without recreating the draft.
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,196 @@
# Agent Mail Inbox
<!-- MANUAL: file_description -->
Blocks for creating, retrieving, listing, updating, and deleting AgentMail inboxes. An Inbox is a fully programmable email account for AI agents — each inbox gets a unique email address and can send, receive, and manage emails via the AgentMail API.
<!-- END MANUAL -->
## Agent Mail Create Inbox
### What it is
Create a new email inbox for an AI agent via AgentMail. Each inbox gets a unique address and can send/receive emails.
### How it works
<!-- MANUAL: how_it_works -->
This block calls the AgentMail API to provision a new inbox. You can optionally specify a username (local part of the address), a custom domain, and a display name. Any parameters left empty use sensible defaults — the username is auto-generated and the domain defaults to `agentmail.to`.
The API returns the newly created inbox object, from which the block extracts the inbox ID and full email address as separate outputs. The complete inbox metadata is also available as a dictionary for downstream blocks that need additional fields. If the API call fails, the error propagates to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| username | Local part of the email address (e.g. 'support' for support@domain.com). Leave empty to auto-generate. | str | No |
| domain | Email domain (e.g. 'mydomain.com'). Defaults to agentmail.to if empty. | str | No |
| display_name | Friendly name shown in the 'From' field of sent emails (e.g. 'Support Agent') | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| inbox_id | Unique identifier for the created inbox (also the email address) | str |
| email_address | Full email address of the inbox (e.g. support@agentmail.to) | str |
| result | Complete inbox object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
**Per-Customer Support Agents** — Spin up a dedicated inbox for each customer so an AI agent can handle their support requests through a personalized email address.
**Campaign-Specific Outreach** — Create a branded inbox for each marketing campaign so replies are automatically routed to the agent managing that campaign.
**Ephemeral Verification Workflows** — Generate a temporary inbox on the fly to receive a sign-up confirmation code, then pass the address to a registration block.
<!-- END MANUAL -->
---
## Agent Mail Delete Inbox
### What it is
Permanently delete an AgentMail inbox and all its messages, threads, and drafts. This action cannot be undone.
### How it works
<!-- MANUAL: how_it_works -->
This block sends a delete request to the AgentMail API using the provided inbox ID. The API permanently removes the inbox along with all associated messages, threads, and drafts. This action is irreversible.
On success the block outputs `success=True`. If the API returns an error (e.g., the inbox does not exist), the exception propagates to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to permanently delete | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| success | True if the inbox was successfully deleted | bool |
### Possible use case
<!-- MANUAL: use_case -->
**Cleanup After Task Completion** — Delete a temporary inbox once an agent finishes processing a one-off workflow like order confirmation or password reset.
**User Offboarding** — Remove an agent's inbox when a customer cancels their account to avoid accumulating unused resources.
**Test Environment Teardown** — Automatically delete inboxes created during integration tests so the organization stays clean between runs.
<!-- END MANUAL -->
---
## Agent Mail Get Inbox
### What it is
Retrieve details of an existing AgentMail inbox including its email address, display name, and configuration.
### How it works
<!-- MANUAL: how_it_works -->
This block calls the AgentMail API to retrieve the details of a single inbox identified by its inbox ID (which is also its email address). The API returns the full inbox object including its email address, display name, and any other metadata.
The block extracts the inbox ID, email address, and display name as individual outputs for easy wiring to downstream blocks. The complete inbox object is also returned as a dictionary. If the inbox does not exist or the request fails, the error propagates to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to look up (e.g. 'support@agentmail.to') | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| inbox_id | Unique identifier of the inbox | str |
| email_address | Full email address of the inbox | str |
| display_name | Friendly name shown in the 'From' field | str |
| result | Complete inbox object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
**Pre-Send Validation** — Fetch inbox details before sending an email to confirm the inbox still exists and verify its display name is correct.
**Dashboard Display** — Retrieve inbox metadata to show the agent's email address and display name in a monitoring dashboard.
**Conditional Routing** — Look up an inbox's properties to decide which downstream workflow should handle incoming messages for that address.
<!-- END MANUAL -->
---
## Agent Mail List Inboxes
### What it is
List all email inboxes in your AgentMail organization with pagination support.
### How it works
<!-- MANUAL: how_it_works -->
This block calls the AgentMail API to list all inboxes in the organization. You can control page size with the `limit` parameter (1-100) and paginate through results using the `page_token` returned from a previous call. Only non-empty parameters are sent in the request.
The block outputs the list of inbox objects, a count of inboxes returned, and a `next_page_token` for fetching additional pages. When there are no more results, `next_page_token` is empty. If the API call fails, the error propagates to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| limit | Maximum number of inboxes to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page of results | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| inboxes | List of inbox objects, each containing inbox_id, email_address, display_name, etc. | List[Dict[str, Any]] |
| count | Total number of inboxes in your organization | int |
| next_page_token | Token to pass as page_token to get the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
**Organization Audit** — Enumerate all inboxes to generate a report of active agent email accounts and their configurations.
**Bulk Operations** — Iterate through every inbox to perform batch updates, such as rotating API keys or updating display names across all agents.
**Stale Inbox Detection** — List all inboxes and cross-reference with recent activity to identify accounts that can be cleaned up.
<!-- END MANUAL -->
---
## Agent Mail Update Inbox
### What it is
Update the display name of an AgentMail inbox. Changes the 'From' name shown when emails are sent.
### How it works
<!-- MANUAL: how_it_works -->
This block calls the AgentMail API to update an existing inbox's display name. It sends the inbox ID and the new display name to the update endpoint. The display name controls the "From" label recipients see when the agent sends emails.
The block outputs the inbox ID and the full updated inbox object as a dictionary. If the inbox does not exist or the request fails, the error propagates to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to update (e.g. 'support@agentmail.to') | str | Yes |
| display_name | New display name for the inbox (e.g. 'Customer Support Bot') | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| inbox_id | The updated inbox ID | str |
| result | Complete updated inbox object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
**Rebranding an Agent** — Change the display name when an agent is reassigned from one team to another so outgoing emails reflect the new identity.
**Personalized Sender Names** — Update the display name dynamically to include a customer's name or ticket number for a more personal touch in automated replies.
**A/B Testing Email Identity** — Swap the display name between variations to test which sender identity gets higher open rates.
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,162 @@
# Agent Mail Lists
<!-- MANUAL: file_description -->
Blocks for managing allow/block lists in AgentMail. Lists let you control which email addresses and domains your agents can send to or receive from, based on direction (send/receive) and type (allow/block).
<!-- END MANUAL -->
## Agent Mail Create List Entry
### What it is
Add an email address or domain to an allow/block list. Block spam senders or whitelist trusted domains.
### How it works
<!-- MANUAL: how_it_works -->
This block calls the AgentMail API to add an email address or domain to a specified list. You select a direction ("send" or "receive") and a list type ("allow" or "block"), then provide the entry to add. For block lists, you can optionally include a reason such as "spam" or "competitor."
The API creates the entry and returns both the entry string and the complete entry object with metadata. Any errors propagate directly to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| direction | 'send' for outgoing email rules, 'receive' for incoming email rules | "send" \| "receive" | Yes |
| list_type | 'allow' to whitelist, 'block' to blacklist | "allow" \| "block" | Yes |
| entry | Email address (user@example.com) or domain (example.com) to add | str | Yes |
| reason | Reason for blocking (only used with block lists, e.g. 'spam', 'competitor') | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| entry | The email address or domain that was added | str |
| result | Complete entry object | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
**Spam Prevention** — Block a known spam domain so your agent never processes incoming messages from it.
**Trusted Partner Allowlisting** — Add a partner's domain to the receive allow list so their emails always reach your agent.
**Outbound Restriction** — Add a competitor's domain to the send block list to prevent your agent from accidentally emailing them.
<!-- END MANUAL -->
---
## Agent Mail Delete List Entry
### What it is
Remove an email address or domain from an allow/block list to stop filtering it.
### How it works
<!-- MANUAL: how_it_works -->
This block calls the AgentMail API to remove an existing entry from an allow or block list. You specify the direction, list type, and the entry to delete.
On success the block returns `success=True`. If the entry does not exist or the API call fails, the error propagates to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| direction | 'send' for outgoing rules, 'receive' for incoming rules | "send" \| "receive" | Yes |
| list_type | 'allow' for whitelist, 'block' for blacklist | "allow" \| "block" | Yes |
| entry | Email address or domain to remove from the list | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| success | True if the entry was successfully removed | bool |
### Possible use case
<!-- MANUAL: use_case -->
**Unblocking a Sender** — Remove a previously blocked domain after confirming it is no longer a spam source.
**Revoking Access** — Delete a domain from the receive allow list when a partnership ends and messages should no longer be accepted.
**Policy Correction** — Remove an entry that was added by mistake so normal email flow resumes immediately.
<!-- END MANUAL -->
---
## Agent Mail Get List Entry
### What it is
Check if an email address or domain is in an allow/block list. Verify filtering rules.
### How it works
<!-- MANUAL: how_it_works -->
This block queries the AgentMail API to check whether a specific email address or domain exists in an allow or block list. You provide the direction, list type, and the entry to look up.
If the entry is found, the block returns the entry string and the complete entry object with metadata (such as the reason it was added). If the entry does not exist, the error propagates to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| direction | 'send' for outgoing rules, 'receive' for incoming rules | "send" \| "receive" | Yes |
| list_type | 'allow' for whitelist, 'block' for blacklist | "allow" \| "block" | Yes |
| entry | Email address or domain to look up | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| entry | The email address or domain that was found | str |
| result | Complete entry object with metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
**Pre-Send Validation** — Check whether a recipient domain is on the send block list before your agent drafts an outbound email.
**Audit Verification** — Look up a specific address to confirm it was added to the block list and review the recorded reason.
**Conditional Routing** — Check the receive allow list for a sender's domain to decide whether to process the message or skip it.
<!-- END MANUAL -->
---
## Agent Mail List Entries
### What it is
List all entries in an AgentMail allow/block list. Choose send/receive direction and allow/block type.
### How it works
<!-- MANUAL: how_it_works -->
This block retrieves all entries from a specified allow or block list by calling the AgentMail API. You select a direction and list type, and optionally set a page size with `limit` and continue paginating with `page_token`.
The API returns the list of entry objects, a count of entries in the current page, and a `next_page_token` for fetching additional results. Any errors propagate to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| direction | 'send' to filter outgoing emails, 'receive' to filter incoming emails | "send" \| "receive" | Yes |
| list_type | 'allow' for whitelist (only permit these), 'block' for blacklist (reject these) | "allow" \| "block" | Yes |
| limit | Maximum number of entries to return per page | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| entries | List of entries, each with an email address or domain | List[Dict[str, Any]] |
| count | Number of entries returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
**Policy Dashboard** — Fetch all entries on the receive block list to display a management view where administrators can review and remove entries.
**Periodic Cleanup** — Page through the full send allow list to identify stale entries that no longer correspond to active partners.
**Compliance Export** — Retrieve every block list entry across both directions to generate a report for an internal compliance audit.
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,247 @@
# Agent Mail Messages
<!-- MANUAL: file_description -->
Blocks for sending, receiving, replying to, forwarding, and managing email messages via AgentMail. Messages are individual emails within conversation threads.
<!-- END MANUAL -->
## Agent Mail Forward Message
### What it is
Forward an email message to one or more recipients. Supports CC/BCC and optional extra text or subject override.
### How it works
<!-- MANUAL: how_it_works -->
The block validates that the combined recipient count across to, cc, and bcc does not exceed 50, then calls the AgentMail API to forward a specific message from your inbox. You provide the inbox ID and message ID to identify the original email, along with the target email addresses. Optionally, you can override the subject line or prepend additional plain text or HTML content before the forwarded message body.
The API handles constructing the forwarded email with the original content included. Any errors from the API propagate directly to the global error handler without being caught by the block.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to forward from | str | Yes |
| message_id | Message ID to forward | str | Yes |
| to | Recipient email addresses to forward the message to (e.g. ['user@example.com']) | List[str] | Yes |
| cc | CC recipient email addresses | List[str] | No |
| bcc | BCC recipient email addresses (hidden from other recipients) | List[str] | No |
| subject | Override the subject line (defaults to 'Fwd: <original subject>') | str | No |
| text | Additional plain text to prepend before the forwarded content | str | No |
| html | Additional HTML to prepend before the forwarded content | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | Unique identifier of the forwarded message | str |
| thread_id | Thread ID of the forward | str |
| result | Complete forwarded message object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **Escalation Routing** — Forward messages that match certain keywords or priority levels to a human supervisor's email for review.
- **Multi-Agent Collaboration** — Forward incoming requests to a specialized agent's inbox so the right agent handles each task.
- **Digest Distribution** — Forward summarized daily reports from an aggregation inbox to a distribution list of stakeholders.
<!-- END MANUAL -->
---
## Agent Mail Get Message
### What it is
Retrieve a specific email message by ID. Includes extracted_text for clean reply content without quoted history.
### How it works
<!-- MANUAL: how_it_works -->
The block fetches a single message from an AgentMail inbox by calling the API with the inbox ID and message ID. It returns the full message content including subject, plain text body, HTML body, and all metadata.
A key output is `extracted_text`, which contains only the new reply content with quoted history stripped out. This is especially useful when feeding message content into an LLM, since it avoids processing redundant quoted text from earlier in the conversation.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the message belongs to | str | Yes |
| message_id | Message ID to retrieve (e.g. '<abc123@agentmail.to>') | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | Unique identifier of the message | str |
| thread_id | Thread this message belongs to | str |
| subject | Email subject line | str |
| text | Full plain text body (may include quoted reply history) | str |
| extracted_text | Just the new reply content with quoted history stripped. Best for AI processing. | str |
| html | HTML body of the email | str |
| result | Complete message object with all fields including sender, recipients, attachments, labels | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **Intent Classification** — Retrieve a message and pass its extracted text to an LLM to classify the sender's intent before routing to the appropriate workflow.
- **Conversation Context Loading** — Fetch a specific message to build context for generating a relevant reply in a multi-turn email conversation.
- **Attachment Processing** — Retrieve a message's full metadata to extract attachment URLs for downstream processing like document parsing or image analysis.
<!-- END MANUAL -->
---
## Agent Mail List Messages
### What it is
List messages in an AgentMail inbox. Filter by labels to find unread, campaign-tagged, or categorized messages.
### How it works
<!-- MANUAL: how_it_works -->
The block queries the AgentMail API to retrieve a paginated list of messages from the specified inbox. You can control the page size with `limit` (1-100) and navigate through results using the `page_token` returned from a previous call. An optional `labels` filter returns only messages that have all of the specified labels.
The block outputs the list of message objects, a count of messages returned in the current page, and a `next_page_token` for fetching subsequent pages. When `next_page_token` is empty, there are no more results.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to list messages from | str | Yes |
| limit | Maximum number of messages to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
| labels | Only return messages with ALL of these labels (e.g. ['unread'] or ['q4-campaign', 'follow-up']) | List[str] | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| messages | List of message objects with subject, sender, text, html, labels, etc. | List[Dict[str, Any]] |
| count | Number of messages returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
- **Inbox Polling** — Periodically list messages labeled "unread" to trigger automated processing workflows for new incoming emails.
- **Campaign Monitoring** — Filter messages by campaign-specific labels to track reply rates and engagement across an outreach sequence.
- **Batch Processing** — Page through all messages in an inbox to perform bulk operations like summarization, archiving, or data extraction.
<!-- END MANUAL -->
---
## Agent Mail Reply To Message
### What it is
Reply to an existing email in the same conversation thread. Use for multi-turn agent conversations.
### How it works
<!-- MANUAL: how_it_works -->
The block sends a reply to an existing message by calling the AgentMail API with the inbox ID, the message ID being replied to, and the reply body text. An optional HTML body can be provided for rich formatting. The API automatically threads the reply into the same conversation as the original message.
The block returns the new reply's message ID, the thread ID it was added to, and the complete message object. This makes it straightforward to build multi-turn email conversations where an agent responds to incoming messages within the same thread.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to send the reply from | str | Yes |
| message_id | Message ID to reply to (e.g. '<abc123@agentmail.to>') | str | Yes |
| text | Plain text body of the reply | str | Yes |
| html | Rich HTML body of the reply | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | Unique identifier of the reply message | str |
| thread_id | Thread ID the reply was added to | str |
| result | Complete reply message object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **Customer Support Agent** — Automatically reply to incoming support emails with answers generated by an LLM based on the message content and a knowledge base.
- **Interview Scheduling** — Reply to candidate emails with proposed interview times after checking calendar availability through another block.
- **Conversational Workflow** — Maintain an ongoing back-and-forth conversation with a user, where each reply builds on the previous exchange to complete a multi-step task.
<!-- END MANUAL -->
---
## Agent Mail Send Message
### What it is
Send a new email from an AgentMail inbox. Creates a new conversation thread. Supports HTML, CC/BCC, and labels.
### How it works
<!-- MANUAL: how_it_works -->
The block first validates that the combined count of `to`, `cc`, and `bcc` recipients does not exceed 50. It then calls the AgentMail API to send a new message from the specified inbox, creating a new conversation thread. You must provide at least a plain text body; an optional HTML body can be included for rich formatting.
The block supports CC and BCC recipients for human-in-the-loop oversight or silent monitoring, and labels for tagging outgoing messages for later filtering. The API returns the new message's ID, the thread ID for tracking future replies, and the complete message object.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to send from (e.g. 'agent@agentmail.to') | str | Yes |
| to | Recipient email addresses (e.g. ['user@example.com']) | List[str] | Yes |
| subject | Email subject line | str | Yes |
| text | Plain text body of the email. Always provide this as a fallback for email clients that don't render HTML. | str | Yes |
| html | Rich HTML body of the email. Embed CSS in a <style> tag for best compatibility across email clients. | str | No |
| cc | CC recipient email addresses for human-in-the-loop oversight | List[str] | No |
| bcc | BCC recipient email addresses (hidden from other recipients) | List[str] | No |
| labels | Labels to tag the message for filtering and state management (e.g. ['outreach', 'q4-campaign']) | List[str] | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | Unique identifier of the sent message | str |
| thread_id | Thread ID grouping this message and any future replies | str |
| result | Complete sent message object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **Outreach Campaigns** — Send personalized cold emails to a list of prospects with campaign labels for tracking, using HTML templates for professional formatting.
- **Alert Notifications** — Send automated alert emails when a monitored metric crosses a threshold, CC-ing a human operator for oversight.
- **Report Delivery** — Generate and send periodic summary reports to stakeholders with BCC to an archive inbox for record-keeping.
<!-- END MANUAL -->
---
## Agent Mail Update Message
### What it is
Add or remove labels on an email message. Use for read/unread tracking, campaign tagging, or state management.
### How it works
<!-- MANUAL: how_it_works -->
The block calls the AgentMail API to modify the labels on a specific message. You can add new labels, remove existing ones, or do both in a single call. Labels are arbitrary strings you define, making them flexible for tracking message state such as read/unread, processing status, or campaign membership.
The block returns the updated message ID and the complete message object reflecting the current label state. This is useful as a downstream step after processing a message, allowing you to mark it as handled so it is not picked up again by a polling workflow.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the message belongs to | str | Yes |
| message_id | Message ID to update labels on | str | Yes |
| add_labels | Labels to add (e.g. ['read', 'processed', 'high-priority']) | List[str] | No |
| remove_labels | Labels to remove (e.g. ['unread', 'pending']) | List[str] | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| message_id | The updated message ID | str |
| result | Complete updated message object with current labels | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **Read/Unread Tracking** — Remove the "unread" label and add "read" after an agent processes a message, preventing duplicate processing on the next polling cycle.
- **Pipeline State Management** — Add labels like "sentiment-analyzed" or "response-drafted" as a message moves through multi-step processing, so each stage knows which messages still need work.
- **Priority Tagging** — Add a "high-priority" label to messages from VIP senders or containing urgent keywords, enabling downstream blocks to filter and handle them first.
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,299 @@
# Agent Mail Pods
<!-- MANUAL: file_description -->
Blocks for creating, managing, and querying pods in AgentMail. Pods provide multi-tenant isolation between customers — each pod acts as an isolated workspace containing its own inboxes, domains, threads, and drafts. Use pods when building SaaS platforms, agency tools, or AI agent fleets serving multiple customers.
<!-- END MANUAL -->
## Agent Mail Create Pod
### What it is
Create a new pod for multi-tenant customer isolation. Use client_id to map to your internal tenant IDs.
### How it works
<!-- MANUAL: how_it_works -->
Calls the AgentMail API to provision a new pod, passing an optional `client_id` parameter. When a `client_id` is provided, AgentMail maps it to the pod so you can later reference the pod using your own internal tenant identifier instead of the AgentMail-assigned `pod_id`.
The block returns the newly created `pod_id` along with the complete pod object containing all metadata. Any API errors propagate directly to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| client_id | Your internal tenant/customer ID for idempotent mapping. Lets you access the pod by your own ID instead of AgentMail's pod_id. | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| pod_id | Unique identifier of the created pod | str |
| result | Complete pod object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **SaaS Customer Onboarding** — Automatically provision an isolated email workspace when a new customer signs up for your platform.
- **AI Agent Fleet Management** — Create a dedicated pod for each AI agent so its email activity is fully isolated from other agents.
- **White-Label Email Service** — Spin up tenant-scoped pods mapped to your internal customer IDs to power a branded email product.
<!-- END MANUAL -->
---
## Agent Mail Create Pod Inbox
### What it is
Create a new email inbox within a pod. The inbox is scoped to the customer workspace.
### How it works
<!-- MANUAL: how_it_works -->
Calls the AgentMail API to create a new inbox scoped to the specified pod. You can optionally provide a `username`, `domain`, and `display_name` to customize the email address and sender identity. If omitted, the username is auto-generated and the domain defaults to `agentmail.to`.
The block returns the `inbox_id`, the full `email_address`, and the complete inbox metadata object. The inbox is fully isolated within the pod, meaning it only appears in that pod's inbox listings and its threads stay separate from other pods.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| pod_id | Pod ID to create the inbox in | str | Yes |
| username | Local part of the email address (e.g. 'support'). Leave empty to auto-generate. | str | No |
| domain | Email domain (e.g. 'mydomain.com'). Defaults to agentmail.to if empty. | str | No |
| display_name | Friendly name shown in the 'From' field (e.g. 'Customer Support') | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| inbox_id | Unique identifier of the created inbox | str |
| email_address | Full email address of the inbox | str |
| result | Complete inbox object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **Per-Customer Support Addresses** — Create a `support@clientdomain.com` inbox inside each customer's pod so inbound support emails are automatically routed to the right tenant.
- **Branded Outbound Campaigns** — Provision inboxes with custom display names and domains for each customer's marketing agent to send branded emails.
- **Multi-Department Agent Setup** — Create separate inboxes (sales, billing, support) within a single customer pod so different AI agents handle different functions.
<!-- END MANUAL -->
---
## Agent Mail Delete Pod
### What it is
Permanently delete a pod. All inboxes and domains must be removed first.
### How it works
<!-- MANUAL: how_it_works -->
Calls the AgentMail API to permanently delete the specified pod. The API enforces a precondition: all inboxes and custom domains must be removed from the pod before deletion is allowed. If any remain, the API returns an error that propagates to the global error handler.
On success the block returns `success=True`. This operation is irreversible -- the pod and its associated `client_id` mapping are permanently removed.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| pod_id | Pod ID to permanently delete (must have no inboxes or domains) | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| success | True if the pod was successfully deleted | bool |
### Possible use case
<!-- MANUAL: use_case -->
- **Customer Offboarding** — Automatically delete a customer's pod after they cancel their subscription and all their inboxes have been cleaned up.
- **Development Environment Cleanup** — Tear down temporary pods created during testing or staging so they do not accumulate over time.
- **Compliance Data Removal** — Permanently remove a tenant's email workspace as part of a GDPR or data-deletion request workflow.
<!-- END MANUAL -->
---
## Agent Mail Get Pod
### What it is
Retrieve details of an existing pod including its client_id mapping and metadata.
### How it works
<!-- MANUAL: how_it_works -->
Calls the AgentMail API with the given `pod_id` to fetch the full pod record. The returned object includes the pod's `client_id` mapping, creation timestamp, and any other metadata stored on the pod.
The block outputs both the `pod_id` and the complete result dictionary. If the pod does not exist, the API error propagates directly to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| pod_id | Pod ID to retrieve | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| pod_id | Unique identifier of the pod | str |
| result | Complete pod object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **Tenant Dashboard Display** — Fetch pod details to show a customer's workspace status, creation date, and associated client ID on an admin dashboard.
- **Pre-Action Validation** — Retrieve pod metadata before performing operations like inbox creation to confirm the pod exists and is correctly mapped.
- **Audit Logging** — Pull pod details as part of an automated audit trail that records which tenant workspace was accessed and when.
<!-- END MANUAL -->
---
## Agent Mail List Pod Drafts
### What it is
List all drafts across all inboxes within a pod. View pending emails for a customer.
### How it works
<!-- MANUAL: how_it_works -->
Calls the AgentMail API to retrieve drafts across all inboxes within the specified pod. Optional `limit` and `page_token` parameters control pagination. Only non-empty parameters are sent to the API.
The block returns the list of draft objects, a `count` of drafts in the current page, and a `next_page_token` for fetching subsequent pages. This provides a pod-wide view of unsent emails without needing to query each inbox individually.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| pod_id | Pod ID to list drafts from | str | Yes |
| limit | Maximum number of drafts to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| drafts | List of draft objects from all inboxes in this pod | List[Dict[str, Any]] |
| count | Number of drafts returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
- **Draft Review Queue** — Surface all pending drafts across a customer's inboxes so a human reviewer can approve or discard them before sending.
- **Stuck Draft Detection** — Periodically list pod drafts to find emails that have been sitting unsent for too long and alert the responsible agent or operator.
- **Customer Activity Summary** — Include draft counts in a tenant dashboard to show how many outbound emails are queued and awaiting dispatch.
<!-- END MANUAL -->
---
## Agent Mail List Pod Inboxes
### What it is
List all inboxes within a pod. View email accounts scoped to a specific customer.
### How it works
<!-- MANUAL: how_it_works -->
Calls the AgentMail API to list all inboxes belonging to the specified pod. Optional `limit` and `page_token` parameters enable paginated retrieval. Only non-empty parameters are included in the API call.
The block returns the list of inbox objects, a `count` of inboxes in the current page, and a `next_page_token` for fetching additional pages. Each inbox object includes its ID, email address, display name, and other metadata.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| pod_id | Pod ID to list inboxes from | str | Yes |
| limit | Maximum number of inboxes to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| inboxes | List of inbox objects within this pod | List[Dict[str, Any]] |
| count | Number of inboxes returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
- **Customer Inbox Inventory** — Display all email addresses belonging to a tenant on their settings page so they can manage or remove unused inboxes.
- **Pre-Deletion Validation** — List a pod's inboxes before attempting to delete the pod, ensuring all inboxes have been removed as required by the API.
- **Multi-Inbox Routing Overview** — Show operators which inboxes exist in a customer's pod so they can configure routing rules for each address.
<!-- END MANUAL -->
---
## Agent Mail List Pod Threads
### What it is
List all conversation threads across all inboxes within a pod. View all email activity for a customer.
### How it works
<!-- MANUAL: how_it_works -->
Calls the AgentMail API to retrieve conversation threads across all inboxes in the specified pod. Supports optional `limit`, `page_token`, and `labels` parameters. When `labels` are provided, only threads matching all specified labels are returned.
The block returns the list of thread objects, a `count` for the current page, and a `next_page_token` for pagination. This gives a unified, cross-inbox view of all email conversations within a customer's workspace.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| pod_id | Pod ID to list threads from | str | Yes |
| limit | Maximum number of threads to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
| labels | Only return threads matching ALL of these labels | List[str] | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| threads | List of thread objects from all inboxes in this pod | List[Dict[str, Any]] |
| count | Number of threads returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
- **Unified Customer Inbox View** — Aggregate all email threads from every inbox in a customer's pod into a single activity feed for support agents or dashboards.
- **Label-Based Ticket Triage** — Filter pod threads by labels like "urgent" or "billing" to route conversations to the appropriate AI agent or human team.
- **Conversation Volume Monitoring** — Periodically list pod threads to track email volume per tenant and trigger alerts when activity spikes or drops.
<!-- END MANUAL -->
---
## Agent Mail List Pods
### What it is
List all tenant pods in your organization. See all customer workspaces at a glance.
### How it works
<!-- MANUAL: how_it_works -->
Calls the AgentMail API to list all pods in your organization. Optional `limit` and `page_token` parameters control pagination. Only non-empty parameters are included in the request.
The block returns a list of pod objects (each containing `pod_id`, `client_id`, creation time, and other metadata), a `count` for the current page, and a `next_page_token` for retrieving additional pages.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| limit | Maximum number of pods to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| pods | List of pod objects with pod_id, client_id, creation time, etc. | List[Dict[str, Any]] |
| count | Number of pods returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
- **Admin Tenant Overview** — Display all customer pods on an internal admin dashboard so operators can monitor workspace count and health at a glance.
- **Automated Tenant Reconciliation** — Periodically list all pods and compare against your internal customer database to detect orphaned or missing workspaces.
- **Usage Reporting** — Enumerate all pods to generate per-tenant usage reports or billing summaries based on workspace activity.
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,189 @@
# Agent Mail Threads
<!-- MANUAL: file_description -->
Blocks for listing, retrieving, and deleting conversation threads in AgentMail. Threads group related messages into a single conversation and can be queried per-inbox or across the entire organization.
<!-- END MANUAL -->
## Agent Mail Delete Inbox Thread
### What it is
Permanently delete a conversation thread and all its messages. This action cannot be undone.
### How it works
<!-- MANUAL: how_it_works -->
The block calls the AgentMail API to permanently delete a thread and all of its messages from the specified inbox. It requires both the inbox ID (or email address) and the thread ID.
On success the block outputs `success=True`. If the API returns an error (for example, the thread does not exist), the error propagates to the global error handler and the block outputs an error message instead.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the thread belongs to | str | Yes |
| thread_id | Thread ID to permanently delete | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| success | True if the thread was successfully deleted | bool |
### Possible use case
<!-- MANUAL: use_case -->
- **GDPR Data Removal** — Permanently delete conversation threads when a user requests erasure of their personal data.
- **Spam Cleanup** — Automatically remove threads flagged as spam by an upstream classification block.
- **Conversation Archival Pipeline** — Delete threads from the live inbox after they have been exported to long-term storage.
<!-- END MANUAL -->
---
## Agent Mail Get Inbox Thread
### What it is
Retrieve a conversation thread with all its messages. Use for getting full conversation context before replying.
### How it works
<!-- MANUAL: how_it_works -->
The block fetches a single thread from a specific inbox by calling the AgentMail API with the inbox ID and thread ID. It returns the thread ID, the full list of messages in chronological order, and the complete thread object as a dictionary.
Any API error (such as an invalid thread ID or insufficient permissions) propagates to the global error handler, and the block outputs an error message.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address the thread belongs to | str | Yes |
| thread_id | Thread ID to retrieve | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| thread_id | Unique identifier of the thread | str |
| messages | All messages in the thread, in chronological order | List[Dict[str, Any]] |
| result | Complete thread object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **Context-Aware Replies** — Retrieve the full conversation history before generating an AI-drafted reply to ensure continuity.
- **Conversation Summarization** — Pull all messages in a thread and pass them to a summarization block for a digest.
- **Support Ticket Review** — Fetch a specific customer thread so a QA agent can evaluate response quality.
<!-- END MANUAL -->
---
## Agent Mail Get Org Thread
### What it is
Retrieve a conversation thread by ID from anywhere in the organization, without needing the inbox ID.
### How it works
<!-- MANUAL: how_it_works -->
The block performs an organization-wide thread lookup by calling the AgentMail API with only the thread ID. Unlike the inbox-scoped variant, no inbox ID is required because the API resolves the thread across all inboxes in the organization.
It returns the thread ID, all messages in chronological order, and the complete thread object. Errors propagate to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| thread_id | Thread ID to retrieve (works across all inboxes) | str | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| thread_id | Unique identifier of the thread | str |
| messages | All messages in the thread, in chronological order | List[Dict[str, Any]] |
| result | Complete thread object with all metadata | Dict[str, Any] |
### Possible use case
<!-- MANUAL: use_case -->
- **Cross-Inbox Thread Tracking** — Look up a thread by ID when the originating inbox is unknown, such as from a webhook or external reference.
- **Supervisor Agent Oversight** — Allow a manager agent to inspect any conversation across the organization without needing inbox-level routing.
- **Audit and Compliance** — Retrieve a specific thread for compliance review when only the thread ID is available from a log or report.
<!-- END MANUAL -->
---
## Agent Mail List Inbox Threads
### What it is
List all conversation threads in an AgentMail inbox. Filter by labels for campaign tracking or status management.
### How it works
<!-- MANUAL: how_it_works -->
The block lists conversation threads within a single inbox by calling the AgentMail API with the inbox ID and optional pagination and filtering parameters. You can set a limit (1-100), pass a page token for pagination, and filter by labels so that only threads matching all specified labels are returned.
The block outputs the list of thread objects, the count of threads returned in this page, and a next-page token for retrieving additional results. Errors propagate to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| inbox_id | Inbox ID or email address to list threads from | str | Yes |
| limit | Maximum number of threads to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
| labels | Only return threads matching ALL of these labels (e.g. ['q4-campaign', 'follow-up']) | List[str] | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| threads | List of thread objects with thread_id, subject, message count, labels, etc. | List[Dict[str, Any]] |
| count | Number of threads returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
- **Inbox Dashboard** — List all threads in a support inbox to display an overview of active conversations.
- **Campaign Monitoring** — Filter threads by a campaign label to track how many conversations a specific outreach effort has generated.
- **Stale Thread Detection** — Paginate through all threads in an inbox to identify conversations that have not received a reply within a set time window.
<!-- END MANUAL -->
---
## Agent Mail List Org Threads
### What it is
List threads across ALL inboxes in your organization. Use for supervisor agents, dashboards, or cross-agent monitoring.
### How it works
<!-- MANUAL: how_it_works -->
The block lists threads across all inboxes in the organization by calling the AgentMail API without an inbox ID. It accepts optional limit, page-token, and label-filter parameters, which are forwarded directly to the API.
Results include threads from every inbox the organization owns. The block outputs the list of thread objects, the count for the current page, and a next-page token for pagination. Errors propagate to the global error handler.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| limit | Maximum number of threads to return per page (1-100) | int | No |
| page_token | Token from a previous response to fetch the next page | str | No |
| labels | Only return threads matching ALL of these labels | List[str] | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| threads | List of thread objects from all inboxes in the organization | List[Dict[str, Any]] |
| count | Number of threads returned | int |
| next_page_token | Token for the next page. Empty if no more results. | str |
### Possible use case
<!-- MANUAL: use_case -->
- **Organization-Wide Activity Feed** — Build a real-time dashboard showing the latest conversations across every agent inbox.
- **Cross-Agent Analytics** — Aggregate thread counts and labels across all inboxes to measure overall communication volume and topic distribution.
- **Escalation Routing** — Scan all org threads for a specific label (e.g., "urgent") and route matching threads to a dedicated escalation agent.
<!-- END MANUAL -->
---

View File

@@ -1,343 +0,0 @@
# Workspace & Media File Architecture
This document describes the architecture for handling user files in AutoGPT Platform, covering persistent user storage (Workspace) and ephemeral media processing pipelines.
## Overview
The platform has two distinct file-handling layers:
| Layer | Purpose | Persistence | Scope |
|-------|---------|-------------|-------|
| **Workspace** | Long-term user file storage | Persistent (DB + GCS/local) | Per-user, session-scoped access |
| **Media Pipeline** | Ephemeral file processing for blocks | Temporary (local disk) | Per-execution |
## Database Models
### UserWorkspace
Represents a user's file storage space. Created on-demand (one per user).
```prisma
model UserWorkspace {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
userId String @unique
Files UserWorkspaceFile[]
}
```
**Key points:**
- One workspace per user (enforced by `@unique` on `userId`)
- Created lazily via `get_or_create_workspace()`
- Uses upsert to handle race conditions
### UserWorkspaceFile
Represents a file stored in a user's workspace.
```prisma
model UserWorkspaceFile {
id String @id @default(uuid())
workspaceId String
name String // User-visible filename
path String // Virtual path (e.g., "/sessions/abc123/image.png")
storagePath String // Actual storage path (gcs://... or local://...)
mimeType String
sizeBytes BigInt
checksum String? // SHA256 for integrity
isDeleted Boolean @default(false)
deletedAt DateTime?
metadata Json @default("{}")
@@unique([workspaceId, path]) // Enforce unique paths within workspace
}
```
**Key points:**
- `path` is a virtual path for organizing files (not actual filesystem path)
- `storagePath` contains the actual GCS or local storage location
- Soft-delete pattern: `isDeleted` flag with `deletedAt` timestamp
- Path is modified on delete to free up the virtual path for reuse
---
## WorkspaceManager
**Location:** `backend/util/workspace.py`
High-level API for workspace file operations. Combines storage backend operations with database record management.
### Initialization
```python
from backend.util.workspace import WorkspaceManager
# Basic usage
manager = WorkspaceManager(user_id="user-123", workspace_id="ws-456")
# With session scoping (CoPilot sessions)
manager = WorkspaceManager(
user_id="user-123",
workspace_id="ws-456",
session_id="session-789"
)
```
### Session Scoping
When `session_id` is provided, files are isolated to `/sessions/{session_id}/`:
```python
# With session_id="abc123":
manager.write_file(content, "image.png")
# → stored at /sessions/abc123/image.png
# Cross-session access is explicit:
manager.read_file("/sessions/other-session/file.txt") # Works
```
**Why session scoping?**
- CoPilot conversations need file isolation
- Prevents file collisions between concurrent sessions
- Allows session cleanup without affecting other sessions
### Core Methods
| Method | Description |
|--------|-------------|
| `write_file(content, filename, path?, mime_type?, overwrite?)` | Write file to workspace |
| `read_file(path)` | Read file by virtual path |
| `read_file_by_id(file_id)` | Read file by ID |
| `list_files(path?, limit?, offset?, include_all_sessions?)` | List files |
| `delete_file(file_id)` | Soft-delete a file |
| `get_download_url(file_id, expires_in?)` | Get signed download URL |
| `get_file_info(file_id)` | Get file metadata |
| `get_file_info_by_path(path)` | Get file metadata by path |
| `get_file_count(path?, include_all_sessions?)` | Count files |
### Storage Backends
WorkspaceManager delegates to `WorkspaceStorageBackend`:
| Backend | When Used | Storage Path Format |
|---------|-----------|---------------------|
| `GCSWorkspaceStorage` | `media_gcs_bucket_name` is configured | `gcs://bucket/workspaces/{ws_id}/{file_id}/{filename}` |
| `LocalWorkspaceStorage` | No GCS bucket configured | `local://{ws_id}/{file_id}/{filename}` |
---
## store_media_file()
**Location:** `backend/util/file.py`
The media normalization pipeline. Handles various input types and normalizes them for processing or output.
### Purpose
Blocks receive files in many formats (URLs, data URIs, workspace references, local paths). `store_media_file()` normalizes these to a consistent format based on what the block needs.
### Input Types Handled
| Input Format | Example | How It's Processed |
|--------------|---------|-------------------|
| Data URI | `data:image/png;base64,iVBOR...` | Decoded, virus scanned, written locally |
| HTTP(S) URL | `https://example.com/image.png` | Downloaded, virus scanned, written locally |
| Workspace URI | `workspace://abc123` or `workspace:///path/to/file` | Read from workspace, virus scanned, written locally |
| Cloud path | `gcs://bucket/path` | Downloaded, virus scanned, written locally |
| Local path | `image.png` | Verified to exist in exec_file directory |
### Return Formats
The `return_format` parameter determines what you get back:
```python
from backend.util.file import store_media_file
# For local processing (ffmpeg, MoviePy, PIL)
local_path = await store_media_file(
file=input_file,
execution_context=ctx,
return_format="for_local_processing"
)
# Returns: "image.png" (relative path in exec_file dir)
# For external APIs (Replicate, OpenAI, etc.)
data_uri = await store_media_file(
file=input_file,
execution_context=ctx,
return_format="for_external_api"
)
# Returns: "data:image/png;base64,iVBOR..."
# For block output (adapts to execution context)
output = await store_media_file(
file=input_file,
execution_context=ctx,
return_format="for_block_output"
)
# In CoPilot: Returns "workspace://file-id#image/png"
# In graphs: Returns "data:image/png;base64,..."
```
### Execution Context
`store_media_file()` requires an `ExecutionContext` with:
- `graph_exec_id` - Required for temp file location
- `user_id` - Required for workspace access
- `workspace_id` - Optional; enables workspace features
- `session_id` - Optional; for session scoping in CoPilot
---
## Responsibility Boundaries
### Virus Scanning
| Component | Scans? | Notes |
|-----------|--------|-------|
| `store_media_file()` | ✅ Yes | Scans **all** content before writing to local disk |
| `WorkspaceManager.write_file()` | ✅ Yes | Scans content before persisting |
**Scanning happens at:**
1. `store_media_file()` — scans everything it downloads/decodes
2. `WorkspaceManager.write_file()` — scans before persistence
Tools like `WriteWorkspaceFileTool` don't need to scan because `WorkspaceManager.write_file()` handles it.
### Persistence
| Component | Persists To | Lifecycle |
|-----------|-------------|-----------|
| `store_media_file()` | Temp dir (`/tmp/exec_file/{exec_id}/`) | Cleaned after execution |
| `WorkspaceManager` | GCS or local storage + DB | Persistent until deleted |
**Automatic cleanup:** `clean_exec_files(graph_exec_id)` removes temp files after execution completes.
---
## Decision Tree: WorkspaceManager vs store_media_file
```text
┌─────────────────────────────────────────────────────┐
│ What do you need to do with the file? │
└─────────────────────────────────────────────────────┘
┌─────────────┴─────────────┐
▼ ▼
Process in a block Store for user access
(ffmpeg, PIL, etc.) (CoPilot files, uploads)
│ │
▼ ▼
store_media_file() WorkspaceManager
with appropriate
return_format
┌──────┴──────┐
▼ ▼
"for_local_ "for_block_
processing" output"
│ │
▼ ▼
Get local Auto-saves to
path for workspace in
tools CoPilot context
Store for user access
├── write_file() ─── Upload + persist (scans internally)
├── read_file() / get_download_url() ─── Retrieve
└── list_files() / delete_file() ─── Manage
```
### Quick Reference
| Scenario | Use |
|----------|-----|
| Block needs to process a file with ffmpeg | `store_media_file(..., return_format="for_local_processing")` |
| Block needs to send file to external API | `store_media_file(..., return_format="for_external_api")` |
| Block returning a generated file | `store_media_file(..., return_format="for_block_output")` |
| API endpoint handling file upload | `WorkspaceManager.write_file()` (handles virus scanning internally) |
| API endpoint serving file download | `WorkspaceManager.get_download_url()` |
| Listing user's files | `WorkspaceManager.list_files()` |
---
## Key Files Reference
| File | Purpose |
|------|---------|
| `backend/data/workspace.py` | Database CRUD operations for UserWorkspace and UserWorkspaceFile |
| `backend/util/workspace.py` | `WorkspaceManager` class - high-level workspace API |
| `backend/util/workspace_storage.py` | Storage backends (GCS, local) and `WorkspaceStorageBackend` interface |
| `backend/util/file.py` | `store_media_file()` and media processing utilities |
| `backend/util/virus_scanner.py` | `VirusScannerService` and `scan_content_safe()` |
| `schema.prisma` | Database model definitions |
---
## Common Patterns
### Block Processing a User's File
```python
async def run(self, input_data, *, execution_context, **kwargs):
# Normalize input to local path
local_path = await store_media_file(
file=input_data.video,
execution_context=execution_context,
return_format="for_local_processing",
)
# Process with local tools
output_path = process_video(local_path)
# Return (auto-saves to workspace in CoPilot)
result = await store_media_file(
file=output_path,
execution_context=execution_context,
return_format="for_block_output",
)
yield "output", result
```
### API Upload Endpoint
```python
from backend.util.virus_scanner import VirusDetectedError, VirusScanError
async def upload_file(file: UploadFile, user_id: str, workspace_id: str):
content = await file.read()
# write_file handles virus scanning internally
manager = WorkspaceManager(user_id, workspace_id)
try:
workspace_file = await manager.write_file(
content=content,
filename=file.filename,
)
except VirusDetectedError:
raise HTTPException(status_code=400, detail="File rejected: virus detected")
except VirusScanError:
raise HTTPException(status_code=503, detail="Virus scanning unavailable")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"file_id": workspace_file.id}
```
---
## Configuration
| Setting | Purpose | Default |
|---------|---------|---------|
| `media_gcs_bucket_name` | GCS bucket for workspace storage | None (uses local) |
| `workspace_storage_dir` | Local storage directory | `{app_data}/workspaces` |
| `max_file_size_mb` | Maximum file size in MB | 100 |
| `clamav_service_enabled` | Enable virus scanning | true |
| `clamav_service_host` | ClamAV daemon host | localhost |
| `clamav_service_port` | ClamAV daemon port | 3310 |
| `clamav_max_concurrency` | Max concurrent scans to ClamAV daemon | 5 |
| `clamav_mark_failed_scans_as_clean` | If true, scan failures pass content through instead of rejecting (⚠️ security risk if ClamAV is unreachable) | false |