mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
feat(block): Enhance Mem0 blocks filetering & add more GoogleSheets blocks (#10287)
The block library was missing two key capabilities that keep coming up
in real-world agent flows:
1. **Granular Mem0 filtering.** Agents often work side-by-side for the
same user, so memories must be scoped to a specific run or agent to
avoid crosstalk.
2. **First-class Google Sheets support.** Many community projects (e.g.,
data-collection, lightweight dashboards, no-code workflows) rely on
Sheets, but we only had a brittle REST call block.
This PR adds fine-grained filters to every Mem0 retrieval block and
introduces a complete, OAuth-ready Google Sheets suite so agents can
create, read, write, format, and manage spreadsheets safely.
:contentReference[oaicite:0]{index=0}
---
### Changes 🏗️
#### 📚 Mem0 block enhancements
* Added `categories_filter`, `metadata_filter`, `limit_memory_to_run`,
and `limit_memory_to_agent` inputs to **SearchMemoryBlock**,
**GetAllMemoriesBlock**, and **GetLatestMemoryBlock**.
* Added identical scoping logic to **AddMemoryBlock** so newly-created
memories can be tied to run/agent IDs.
#### 📊 New Google Sheets blocks (`backend/blocks/google/sheets.py`)
| Block | Purpose |
|-------|---------|
| `GoogleSheetsReadBlock` | Read a range |
| `GoogleSheetsWriteBlock` | Overwrite a range |
| `GoogleSheetsAppendBlock` | Append rows |
| `GoogleSheetsClearBlock` | Clear a range |
| `GoogleSheetsMetadataBlock` | Fetch spreadsheet + sheet info |
| `GoogleSheetsManageSheetBlock` | Create / delete / copy a sheet |
| `GoogleSheetsBatchOperationsBlock` | Batch update / clear |
| `GoogleSheetsFindReplaceBlock` | Find & replace text |
| `GoogleSheetsFormatBlock` | Cell formatting (bg/text colour, bold,
italic, font size) |
| `GoogleSheetsCreateSpreadsheetBlock` | Spin up a new spreadsheet |
* Each block has typed input/output schemas, built-in test mocks, and is
disabled in prod unless Google OAuth is configured.
* Added helper enums (`SheetOperation`, `BatchOperationType`) and
updated **CLAUDE.md** contributor guide with a UUID-generation step.
:contentReference[oaicite:2]{index=2}
---
### Checklist 📋
#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
<!-- Put your test plan here: -->
- [x] Manual E2E run: agent writes chat summary to new spreadsheet,
reads it back, searches memory with run-scoped filter
- [x] Live Google API smoke-tests (read/write/append/clear/format) using
a disposable spreadsheet
This commit is contained in:
@@ -121,6 +121,7 @@ Key models (defined in `/backend/schema.prisma`):
|
||||
3. Define input/output schemas
|
||||
4. Implement `run` method
|
||||
5. Register in block registry
|
||||
6. Generate the block uuid using `uuid.uuid4()`
|
||||
|
||||
**Modifying the API:**
|
||||
1. Update route in `/backend/backend/server/routers/`
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
|
||||
from google.oauth2.credentials import Credentials
|
||||
from googleapiclient.discovery import build
|
||||
@@ -16,6 +17,102 @@ from ._auth import (
|
||||
GoogleCredentialsInput,
|
||||
)
|
||||
|
||||
settings = Settings()
|
||||
GOOGLE_SHEETS_DISABLED = (
|
||||
not GOOGLE_OAUTH_IS_CONFIGURED
|
||||
or settings.config.app_env == AppEnvironment.PRODUCTION
|
||||
)
|
||||
|
||||
|
||||
def parse_a1_notation(a1: str) -> tuple[str | None, str]:
|
||||
"""Split an A1‑notation string into *(sheet_name, cell_range)*.
|
||||
|
||||
Examples
|
||||
--------
|
||||
>>> parse_a1_notation("Sheet1!A1:B2")
|
||||
("Sheet1", "A1:B2")
|
||||
>>> parse_a1_notation("A1:B2")
|
||||
(None, "A1:B2")
|
||||
"""
|
||||
|
||||
if "!" in a1:
|
||||
sheet, cell_range = a1.split("!", 1)
|
||||
return sheet, cell_range
|
||||
return None, a1
|
||||
|
||||
|
||||
def _first_sheet_meta(service, spreadsheet_id: str) -> tuple[str, int]:
|
||||
"""Return *(title, sheetId)* for the first sheet in *spreadsheet_id*."""
|
||||
|
||||
meta = (
|
||||
service.spreadsheets()
|
||||
.get(spreadsheetId=spreadsheet_id, includeGridData=False)
|
||||
.execute()
|
||||
)
|
||||
first = meta["sheets"][0]["properties"]
|
||||
return first["title"], first["sheetId"]
|
||||
|
||||
|
||||
def resolve_sheet_name(service, spreadsheet_id: str, sheet_name: str | None) -> str:
|
||||
"""Resolve *sheet_name*, falling back to the workbook's first sheet if empty."""
|
||||
|
||||
if sheet_name:
|
||||
return sheet_name
|
||||
title, _ = _first_sheet_meta(service, spreadsheet_id)
|
||||
return title
|
||||
|
||||
|
||||
def sheet_id_by_name(service, spreadsheet_id: str, sheet_name: str) -> int | None:
|
||||
"""Return the *sheetId* for *sheet_name* (or `None` if not found)."""
|
||||
|
||||
meta = service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
|
||||
for sh in meta.get("sheets", []):
|
||||
if sh.get("properties", {}).get("title") == sheet_name:
|
||||
return sh["properties"]["sheetId"]
|
||||
return None
|
||||
|
||||
|
||||
def _build_sheets_service(credentials: GoogleCredentials):
|
||||
settings = Settings()
|
||||
creds = Credentials(
|
||||
token=(
|
||||
credentials.access_token.get_secret_value()
|
||||
if credentials.access_token
|
||||
else None
|
||||
),
|
||||
refresh_token=(
|
||||
credentials.refresh_token.get_secret_value()
|
||||
if credentials.refresh_token
|
||||
else None
|
||||
),
|
||||
token_uri="https://oauth2.googleapis.com/token",
|
||||
client_id=settings.secrets.google_client_id,
|
||||
client_secret=settings.secrets.google_client_secret,
|
||||
scopes=credentials.scopes,
|
||||
)
|
||||
return build("sheets", "v4", credentials=creds)
|
||||
|
||||
|
||||
class SheetOperation(str, Enum):
|
||||
CREATE = "create"
|
||||
DELETE = "delete"
|
||||
COPY = "copy"
|
||||
|
||||
|
||||
class BatchOperationType(str, Enum):
|
||||
UPDATE = "update"
|
||||
CLEAR = "clear"
|
||||
|
||||
|
||||
class BatchOperation(BlockSchema):
|
||||
type: BatchOperationType = SchemaField(
|
||||
description="The type of operation to perform"
|
||||
)
|
||||
range: str = SchemaField(description="The A1 notation range for the operation")
|
||||
values: list[list[str]] = SchemaField(
|
||||
description="Values to update (only for UPDATE)", default=[]
|
||||
)
|
||||
|
||||
|
||||
class GoogleSheetsReadBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
@@ -38,15 +135,13 @@ class GoogleSheetsReadBlock(Block):
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
settings = Settings()
|
||||
super().__init__(
|
||||
id="5724e902-3635-47e9-a108-aaa0263a4988",
|
||||
description="This block reads data from a Google Sheets spreadsheet.",
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsReadBlock.Input,
|
||||
output_schema=GoogleSheetsReadBlock.Output,
|
||||
disabled=not GOOGLE_OAUTH_IS_CONFIGURED
|
||||
or settings.config.app_env == AppEnvironment.PRODUCTION,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"range": "Sheet1!A1:B2",
|
||||
@@ -73,32 +168,12 @@ class GoogleSheetsReadBlock(Block):
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = self._build_service(credentials, **kwargs)
|
||||
service = _build_sheets_service(credentials)
|
||||
data = await asyncio.to_thread(
|
||||
self._read_sheet, service, input_data.spreadsheet_id, input_data.range
|
||||
)
|
||||
yield "result", data
|
||||
|
||||
@staticmethod
|
||||
def _build_service(credentials: GoogleCredentials, **kwargs):
|
||||
creds = Credentials(
|
||||
token=(
|
||||
credentials.access_token.get_secret_value()
|
||||
if credentials.access_token
|
||||
else None
|
||||
),
|
||||
refresh_token=(
|
||||
credentials.refresh_token.get_secret_value()
|
||||
if credentials.refresh_token
|
||||
else None
|
||||
),
|
||||
token_uri="https://oauth2.googleapis.com/token",
|
||||
client_id=Settings().secrets.google_client_id,
|
||||
client_secret=Settings().secrets.google_client_secret,
|
||||
scopes=credentials.scopes,
|
||||
)
|
||||
return build("sheets", "v4", credentials=creds)
|
||||
|
||||
def _read_sheet(self, service, spreadsheet_id: str, range: str) -> list[list[str]]:
|
||||
sheet = service.spreadsheets()
|
||||
result = sheet.values().get(spreadsheetId=spreadsheet_id, range=range).execute()
|
||||
@@ -135,7 +210,7 @@ class GoogleSheetsWriteBlock(Block):
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsWriteBlock.Input,
|
||||
output_schema=GoogleSheetsWriteBlock.Output,
|
||||
disabled=not GOOGLE_OAUTH_IS_CONFIGURED,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"range": "Sheet1!A1:B2",
|
||||
@@ -164,7 +239,7 @@ class GoogleSheetsWriteBlock(Block):
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = GoogleSheetsReadBlock._build_service(credentials, **kwargs)
|
||||
service = _build_sheets_service(credentials)
|
||||
result = await asyncio.to_thread(
|
||||
self._write_sheet,
|
||||
service,
|
||||
@@ -190,3 +265,790 @@ class GoogleSheetsWriteBlock(Block):
|
||||
.execute()
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
class GoogleSheetsAppendBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GoogleCredentialsInput = GoogleCredentialsField(
|
||||
["https://www.googleapis.com/auth/spreadsheets"]
|
||||
)
|
||||
spreadsheet_id: str = SchemaField(description="Spreadsheet ID")
|
||||
sheet_name: str = SchemaField(
|
||||
description="Optional sheet to append to (defaults to first sheet)",
|
||||
default="",
|
||||
)
|
||||
values: list[list[str]] = SchemaField(description="Rows to append")
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: dict = SchemaField(description="Append API response")
|
||||
error: str = SchemaField(description="Error message, if any")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="531d50c0-d6b9-4cf9-a013-7bf783d313c7",
|
||||
description="Append data to a Google Sheet (sheet optional)",
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsAppendBlock.Input,
|
||||
output_schema=GoogleSheetsAppendBlock.Output,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"values": [["Charlie", "95"]],
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_output=[
|
||||
("result", {"updatedCells": 2, "updatedColumns": 2, "updatedRows": 1}),
|
||||
],
|
||||
test_mock={
|
||||
"_append_sheet": lambda *args, **kwargs: {
|
||||
"updatedCells": 2,
|
||||
"updatedColumns": 2,
|
||||
"updatedRows": 1,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = _build_sheets_service(credentials)
|
||||
result = await asyncio.to_thread(
|
||||
self._append_sheet,
|
||||
service,
|
||||
input_data.spreadsheet_id,
|
||||
input_data.sheet_name,
|
||||
input_data.values,
|
||||
)
|
||||
yield "result", result
|
||||
|
||||
def _append_sheet(
|
||||
self,
|
||||
service,
|
||||
spreadsheet_id: str,
|
||||
sheet_name: str,
|
||||
values: list[list[str]],
|
||||
) -> dict:
|
||||
target_sheet = resolve_sheet_name(service, spreadsheet_id, sheet_name)
|
||||
body = {"values": values}
|
||||
return (
|
||||
service.spreadsheets()
|
||||
.values()
|
||||
.append(
|
||||
spreadsheetId=spreadsheet_id,
|
||||
range=f"{target_sheet}!A:A",
|
||||
valueInputOption="USER_ENTERED",
|
||||
insertDataOption="INSERT_ROWS",
|
||||
body=body,
|
||||
)
|
||||
.execute()
|
||||
)
|
||||
|
||||
|
||||
class GoogleSheetsClearBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GoogleCredentialsInput = GoogleCredentialsField(
|
||||
["https://www.googleapis.com/auth/spreadsheets"]
|
||||
)
|
||||
spreadsheet_id: str = SchemaField(
|
||||
description="The ID of the spreadsheet to clear",
|
||||
)
|
||||
range: str = SchemaField(
|
||||
description="The A1 notation of the range to clear",
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: dict = SchemaField(
|
||||
description="The result of the clear operation",
|
||||
)
|
||||
error: str = SchemaField(
|
||||
description="Error message if any",
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="84938266-0fc7-46e5-9369-adb0f6ae8015",
|
||||
description="This block clears data from a specified range in a Google Sheets spreadsheet.",
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsClearBlock.Input,
|
||||
output_schema=GoogleSheetsClearBlock.Output,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"range": "Sheet1!A1:B2",
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_output=[
|
||||
("result", {"clearedRange": "Sheet1!A1:B2"}),
|
||||
],
|
||||
test_mock={
|
||||
"_clear_range": lambda *args, **kwargs: {
|
||||
"clearedRange": "Sheet1!A1:B2"
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = _build_sheets_service(credentials)
|
||||
result = await asyncio.to_thread(
|
||||
self._clear_range,
|
||||
service,
|
||||
input_data.spreadsheet_id,
|
||||
input_data.range,
|
||||
)
|
||||
yield "result", result
|
||||
|
||||
def _clear_range(self, service, spreadsheet_id: str, range: str) -> dict:
|
||||
result = (
|
||||
service.spreadsheets()
|
||||
.values()
|
||||
.clear(spreadsheetId=spreadsheet_id, range=range)
|
||||
.execute()
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
class GoogleSheetsMetadataBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GoogleCredentialsInput = GoogleCredentialsField(
|
||||
["https://www.googleapis.com/auth/spreadsheets.readonly"]
|
||||
)
|
||||
spreadsheet_id: str = SchemaField(
|
||||
description="The ID of the spreadsheet to get metadata for",
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: dict = SchemaField(
|
||||
description="The metadata of the spreadsheet including sheets info",
|
||||
)
|
||||
error: str = SchemaField(
|
||||
description="Error message if any",
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="6a0be6ee-7a0d-4c92-819b-500630846ad0",
|
||||
description="This block retrieves metadata about a Google Sheets spreadsheet including sheet names and properties.",
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsMetadataBlock.Input,
|
||||
output_schema=GoogleSheetsMetadataBlock.Output,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_output=[
|
||||
(
|
||||
"result",
|
||||
{
|
||||
"title": "Test Spreadsheet",
|
||||
"sheets": [{"title": "Sheet1", "sheetId": 0}],
|
||||
},
|
||||
),
|
||||
],
|
||||
test_mock={
|
||||
"_get_metadata": lambda *args, **kwargs: {
|
||||
"title": "Test Spreadsheet",
|
||||
"sheets": [{"title": "Sheet1", "sheetId": 0}],
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = _build_sheets_service(credentials)
|
||||
result = await asyncio.to_thread(
|
||||
self._get_metadata,
|
||||
service,
|
||||
input_data.spreadsheet_id,
|
||||
)
|
||||
yield "result", result
|
||||
|
||||
def _get_metadata(self, service, spreadsheet_id: str) -> dict:
|
||||
result = (
|
||||
service.spreadsheets()
|
||||
.get(spreadsheetId=spreadsheet_id, includeGridData=False)
|
||||
.execute()
|
||||
)
|
||||
return {
|
||||
"title": result.get("properties", {}).get("title"),
|
||||
"sheets": [
|
||||
{
|
||||
"title": sheet.get("properties", {}).get("title"),
|
||||
"sheetId": sheet.get("properties", {}).get("sheetId"),
|
||||
"gridProperties": sheet.get("properties", {}).get(
|
||||
"gridProperties", {}
|
||||
),
|
||||
}
|
||||
for sheet in result.get("sheets", [])
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
class GoogleSheetsManageSheetBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GoogleCredentialsInput = GoogleCredentialsField(
|
||||
["https://www.googleapis.com/auth/spreadsheets"]
|
||||
)
|
||||
spreadsheet_id: str = SchemaField(description="Spreadsheet ID")
|
||||
operation: SheetOperation = SchemaField(description="Operation to perform")
|
||||
sheet_name: str = SchemaField(
|
||||
description="Target sheet name (defaults to first sheet for delete)",
|
||||
default="",
|
||||
)
|
||||
source_sheet_id: int = SchemaField(
|
||||
description="Source sheet ID for copy", default=0
|
||||
)
|
||||
destination_sheet_name: str = SchemaField(
|
||||
description="New sheet name for copy", default=""
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: dict = SchemaField(description="Operation result")
|
||||
error: str = SchemaField(description="Error message, if any")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="7940189d-b137-4ef1-aa18-3dd9a5bde9f3",
|
||||
description="Create, delete, or copy sheets (sheet optional)",
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsManageSheetBlock.Input,
|
||||
output_schema=GoogleSheetsManageSheetBlock.Output,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"operation": SheetOperation.CREATE,
|
||||
"sheet_name": "NewSheet",
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_output=[("result", {"success": True, "sheetId": 123})],
|
||||
test_mock={
|
||||
"_manage_sheet": lambda *args, **kwargs: {
|
||||
"success": True,
|
||||
"sheetId": 123,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = _build_sheets_service(credentials)
|
||||
result = await asyncio.to_thread(
|
||||
self._manage_sheet,
|
||||
service,
|
||||
input_data.spreadsheet_id,
|
||||
input_data.operation,
|
||||
input_data.sheet_name,
|
||||
input_data.source_sheet_id,
|
||||
input_data.destination_sheet_name,
|
||||
)
|
||||
yield "result", result
|
||||
|
||||
def _manage_sheet(
|
||||
self,
|
||||
service,
|
||||
spreadsheet_id: str,
|
||||
operation: SheetOperation,
|
||||
sheet_name: str,
|
||||
source_sheet_id: int,
|
||||
destination_sheet_name: str,
|
||||
) -> dict:
|
||||
requests = []
|
||||
|
||||
# Ensure a target sheet name when needed
|
||||
target_name = resolve_sheet_name(service, spreadsheet_id, sheet_name)
|
||||
|
||||
if operation == SheetOperation.CREATE:
|
||||
requests.append({"addSheet": {"properties": {"title": target_name}}})
|
||||
elif operation == SheetOperation.DELETE:
|
||||
sid = sheet_id_by_name(service, spreadsheet_id, target_name)
|
||||
if sid is None:
|
||||
return {"error": f"Sheet '{target_name}' not found"}
|
||||
requests.append({"deleteSheet": {"sheetId": sid}})
|
||||
elif operation == SheetOperation.COPY:
|
||||
requests.append(
|
||||
{
|
||||
"duplicateSheet": {
|
||||
"sourceSheetId": source_sheet_id,
|
||||
"newSheetName": destination_sheet_name
|
||||
or f"Copy of {source_sheet_id}",
|
||||
}
|
||||
}
|
||||
)
|
||||
else:
|
||||
return {"error": f"Unknown operation: {operation}"}
|
||||
|
||||
body = {"requests": requests}
|
||||
result = (
|
||||
service.spreadsheets()
|
||||
.batchUpdate(spreadsheetId=spreadsheet_id, body=body)
|
||||
.execute()
|
||||
)
|
||||
return {"success": True, "result": result}
|
||||
|
||||
|
||||
class GoogleSheetsBatchOperationsBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GoogleCredentialsInput = GoogleCredentialsField(
|
||||
["https://www.googleapis.com/auth/spreadsheets"]
|
||||
)
|
||||
spreadsheet_id: str = SchemaField(
|
||||
description="The ID of the spreadsheet to perform batch operations on",
|
||||
)
|
||||
operations: list[BatchOperation] = SchemaField(
|
||||
description="List of operations to perform",
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: dict = SchemaField(
|
||||
description="The result of the batch operations",
|
||||
)
|
||||
error: str = SchemaField(
|
||||
description="Error message if any",
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="a4078584-6fe5-46e0-997e-d5126cdd112a",
|
||||
description="This block performs multiple operations on a Google Sheets spreadsheet in a single batch request.",
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsBatchOperationsBlock.Input,
|
||||
output_schema=GoogleSheetsBatchOperationsBlock.Output,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"operations": [
|
||||
{
|
||||
"type": BatchOperationType.UPDATE,
|
||||
"range": "A1:B1",
|
||||
"values": [["Header1", "Header2"]],
|
||||
},
|
||||
{
|
||||
"type": BatchOperationType.UPDATE,
|
||||
"range": "A2:B2",
|
||||
"values": [["Data1", "Data2"]],
|
||||
},
|
||||
],
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_output=[
|
||||
("result", {"totalUpdatedCells": 4, "replies": []}),
|
||||
],
|
||||
test_mock={
|
||||
"_batch_operations": lambda *args, **kwargs: {
|
||||
"totalUpdatedCells": 4,
|
||||
"replies": [],
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = _build_sheets_service(credentials)
|
||||
result = await asyncio.to_thread(
|
||||
self._batch_operations,
|
||||
service,
|
||||
input_data.spreadsheet_id,
|
||||
input_data.operations,
|
||||
)
|
||||
yield "result", result
|
||||
|
||||
def _batch_operations(
|
||||
self, service, spreadsheet_id: str, operations: list[BatchOperation]
|
||||
) -> dict:
|
||||
update_data = []
|
||||
clear_ranges = []
|
||||
|
||||
for op in operations:
|
||||
if op.type == BatchOperationType.UPDATE:
|
||||
update_data.append(
|
||||
{
|
||||
"range": op.range,
|
||||
"values": op.values,
|
||||
}
|
||||
)
|
||||
elif op.type == BatchOperationType.CLEAR:
|
||||
clear_ranges.append(op.range)
|
||||
|
||||
results = {}
|
||||
|
||||
# Perform updates if any
|
||||
if update_data:
|
||||
update_body = {
|
||||
"valueInputOption": "USER_ENTERED",
|
||||
"data": update_data,
|
||||
}
|
||||
update_result = (
|
||||
service.spreadsheets()
|
||||
.values()
|
||||
.batchUpdate(spreadsheetId=spreadsheet_id, body=update_body)
|
||||
.execute()
|
||||
)
|
||||
results["updateResult"] = update_result
|
||||
|
||||
# Perform clears if any
|
||||
if clear_ranges:
|
||||
clear_body = {"ranges": clear_ranges}
|
||||
clear_result = (
|
||||
service.spreadsheets()
|
||||
.values()
|
||||
.batchClear(spreadsheetId=spreadsheet_id, body=clear_body)
|
||||
.execute()
|
||||
)
|
||||
results["clearResult"] = clear_result
|
||||
|
||||
return results
|
||||
|
||||
|
||||
class GoogleSheetsFindReplaceBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GoogleCredentialsInput = GoogleCredentialsField(
|
||||
["https://www.googleapis.com/auth/spreadsheets"]
|
||||
)
|
||||
spreadsheet_id: str = SchemaField(
|
||||
description="The ID of the spreadsheet to perform find/replace on",
|
||||
)
|
||||
find_text: str = SchemaField(
|
||||
description="The text to find",
|
||||
)
|
||||
replace_text: str = SchemaField(
|
||||
description="The text to replace with",
|
||||
)
|
||||
sheet_id: int = SchemaField(
|
||||
description="The ID of the specific sheet to search (optional, searches all sheets if not provided)",
|
||||
default=-1,
|
||||
)
|
||||
match_case: bool = SchemaField(
|
||||
description="Whether to match case",
|
||||
default=False,
|
||||
)
|
||||
match_entire_cell: bool = SchemaField(
|
||||
description="Whether to match entire cell",
|
||||
default=False,
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: dict = SchemaField(
|
||||
description="The result of the find/replace operation including number of replacements",
|
||||
)
|
||||
error: str = SchemaField(
|
||||
description="Error message if any",
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="accca760-8174-4656-b55e-5f0e82fee986",
|
||||
description="This block finds and replaces text in a Google Sheets spreadsheet.",
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsFindReplaceBlock.Input,
|
||||
output_schema=GoogleSheetsFindReplaceBlock.Output,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"find_text": "old_value",
|
||||
"replace_text": "new_value",
|
||||
"match_case": False,
|
||||
"match_entire_cell": False,
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_output=[
|
||||
("result", {"occurrencesChanged": 5}),
|
||||
],
|
||||
test_mock={
|
||||
"_find_replace": lambda *args, **kwargs: {"occurrencesChanged": 5},
|
||||
},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = _build_sheets_service(credentials)
|
||||
result = await asyncio.to_thread(
|
||||
self._find_replace,
|
||||
service,
|
||||
input_data.spreadsheet_id,
|
||||
input_data.find_text,
|
||||
input_data.replace_text,
|
||||
input_data.sheet_id,
|
||||
input_data.match_case,
|
||||
input_data.match_entire_cell,
|
||||
)
|
||||
yield "result", result
|
||||
|
||||
def _find_replace(
|
||||
self,
|
||||
service,
|
||||
spreadsheet_id: str,
|
||||
find_text: str,
|
||||
replace_text: str,
|
||||
sheet_id: int,
|
||||
match_case: bool,
|
||||
match_entire_cell: bool,
|
||||
) -> dict:
|
||||
find_replace_request = {
|
||||
"find": find_text,
|
||||
"replacement": replace_text,
|
||||
"matchCase": match_case,
|
||||
"matchEntireCell": match_entire_cell,
|
||||
}
|
||||
|
||||
if sheet_id >= 0:
|
||||
find_replace_request["sheetId"] = sheet_id
|
||||
|
||||
requests = [{"findReplace": find_replace_request}]
|
||||
body = {"requests": requests}
|
||||
|
||||
result = (
|
||||
service.spreadsheets()
|
||||
.batchUpdate(spreadsheetId=spreadsheet_id, body=body)
|
||||
.execute()
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class GoogleSheetsFormatBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GoogleCredentialsInput = GoogleCredentialsField(
|
||||
["https://www.googleapis.com/auth/spreadsheets"]
|
||||
)
|
||||
spreadsheet_id: str = SchemaField(description="Spreadsheet ID")
|
||||
range: str = SchemaField(description="A1 notation – sheet optional")
|
||||
background_color: dict = SchemaField(default={})
|
||||
text_color: dict = SchemaField(default={})
|
||||
bold: bool = SchemaField(default=False)
|
||||
italic: bool = SchemaField(default=False)
|
||||
font_size: int = SchemaField(default=10)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: dict = SchemaField(description="API response or success flag")
|
||||
error: str = SchemaField(description="Error message, if any")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="270f2384-8089-4b5b-b2e3-fe2ea3d87c02",
|
||||
description="Format a range in a Google Sheet (sheet optional)",
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsFormatBlock.Input,
|
||||
output_schema=GoogleSheetsFormatBlock.Output,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"spreadsheet_id": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"range": "A1:B2",
|
||||
"background_color": {"red": 1.0, "green": 0.9, "blue": 0.9},
|
||||
"bold": True,
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_output=[("result", {"success": True})],
|
||||
test_mock={"_format_cells": lambda *args, **kwargs: {"success": True}},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = _build_sheets_service(credentials)
|
||||
result = await asyncio.to_thread(
|
||||
self._format_cells,
|
||||
service,
|
||||
input_data.spreadsheet_id,
|
||||
input_data.range,
|
||||
input_data.background_color,
|
||||
input_data.text_color,
|
||||
input_data.bold,
|
||||
input_data.italic,
|
||||
input_data.font_size,
|
||||
)
|
||||
if "error" in result:
|
||||
yield "error", result["error"]
|
||||
else:
|
||||
yield "result", result
|
||||
|
||||
def _format_cells(
|
||||
self,
|
||||
service,
|
||||
spreadsheet_id: str,
|
||||
a1_range: str,
|
||||
background_color: dict,
|
||||
text_color: dict,
|
||||
bold: bool,
|
||||
italic: bool,
|
||||
font_size: int,
|
||||
) -> dict:
|
||||
sheet_name, cell_range = parse_a1_notation(a1_range)
|
||||
sheet_name = resolve_sheet_name(service, spreadsheet_id, sheet_name)
|
||||
|
||||
sheet_id = sheet_id_by_name(service, spreadsheet_id, sheet_name)
|
||||
if sheet_id is None:
|
||||
return {"error": f"Sheet '{sheet_name}' not found"}
|
||||
|
||||
try:
|
||||
start_cell, end_cell = cell_range.split(":")
|
||||
start_col = ord(start_cell[0].upper()) - ord("A")
|
||||
start_row = int(start_cell[1:]) - 1
|
||||
end_col = ord(end_cell[0].upper()) - ord("A") + 1
|
||||
end_row = int(end_cell[1:])
|
||||
except (ValueError, IndexError):
|
||||
return {"error": f"Invalid range format: {a1_range}"}
|
||||
|
||||
cell_format: dict = {"userEnteredFormat": {}}
|
||||
if background_color:
|
||||
cell_format["userEnteredFormat"]["backgroundColor"] = background_color
|
||||
|
||||
text_format: dict = {}
|
||||
if text_color:
|
||||
text_format["foregroundColor"] = text_color
|
||||
if bold:
|
||||
text_format["bold"] = True
|
||||
if italic:
|
||||
text_format["italic"] = True
|
||||
if font_size != 10:
|
||||
text_format["fontSize"] = font_size
|
||||
if text_format:
|
||||
cell_format["userEnteredFormat"]["textFormat"] = text_format
|
||||
|
||||
body = {
|
||||
"requests": [
|
||||
{
|
||||
"repeatCell": {
|
||||
"range": {
|
||||
"sheetId": sheet_id,
|
||||
"startRowIndex": start_row,
|
||||
"endRowIndex": end_row,
|
||||
"startColumnIndex": start_col,
|
||||
"endColumnIndex": end_col,
|
||||
},
|
||||
"cell": cell_format,
|
||||
"fields": "userEnteredFormat(backgroundColor,textFormat)",
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
service.spreadsheets().batchUpdate(
|
||||
spreadsheetId=spreadsheet_id, body=body
|
||||
).execute()
|
||||
return {"success": True}
|
||||
|
||||
|
||||
class GoogleSheetsCreateSpreadsheetBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GoogleCredentialsInput = GoogleCredentialsField(
|
||||
["https://www.googleapis.com/auth/spreadsheets"]
|
||||
)
|
||||
title: str = SchemaField(
|
||||
description="The title of the new spreadsheet",
|
||||
)
|
||||
sheet_names: list[str] = SchemaField(
|
||||
description="List of sheet names to create (optional, defaults to single 'Sheet1')",
|
||||
default=["Sheet1"],
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
result: dict = SchemaField(
|
||||
description="The result containing spreadsheet ID and URL",
|
||||
)
|
||||
spreadsheet_id: str = SchemaField(
|
||||
description="The ID of the created spreadsheet",
|
||||
)
|
||||
spreadsheet_url: str = SchemaField(
|
||||
description="The URL of the created spreadsheet",
|
||||
)
|
||||
error: str = SchemaField(
|
||||
description="Error message if any",
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="c8d4c0d3-c76e-4c2a-8c66-4119817ea3d1",
|
||||
description="This block creates a new Google Sheets spreadsheet with specified sheets.",
|
||||
categories={BlockCategory.DATA},
|
||||
input_schema=GoogleSheetsCreateSpreadsheetBlock.Input,
|
||||
output_schema=GoogleSheetsCreateSpreadsheetBlock.Output,
|
||||
disabled=GOOGLE_SHEETS_DISABLED,
|
||||
test_input={
|
||||
"title": "Test Spreadsheet",
|
||||
"sheet_names": ["Sheet1", "Data", "Summary"],
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_output=[
|
||||
("spreadsheet_id", "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms"),
|
||||
(
|
||||
"spreadsheet_url",
|
||||
"https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit",
|
||||
),
|
||||
("result", {"success": True}),
|
||||
],
|
||||
test_mock={
|
||||
"_create_spreadsheet": lambda *args, **kwargs: {
|
||||
"spreadsheetId": "1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms",
|
||||
"spreadsheetUrl": "https://docs.google.com/spreadsheets/d/1BxiMVs0XRA5nFMdKvBdBZjgmUUqptlbs74OgvE2upms/edit",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self, input_data: Input, *, credentials: GoogleCredentials, **kwargs
|
||||
) -> BlockOutput:
|
||||
service = _build_sheets_service(credentials)
|
||||
result = await asyncio.to_thread(
|
||||
self._create_spreadsheet,
|
||||
service,
|
||||
input_data.title,
|
||||
input_data.sheet_names,
|
||||
)
|
||||
|
||||
if "error" in result:
|
||||
yield "error", result["error"]
|
||||
else:
|
||||
yield "spreadsheet_id", result["spreadsheetId"]
|
||||
yield "spreadsheet_url", result["spreadsheetUrl"]
|
||||
yield "result", {"success": True}
|
||||
|
||||
def _create_spreadsheet(self, service, title: str, sheet_names: list[str]) -> dict:
|
||||
try:
|
||||
# Create the initial spreadsheet
|
||||
spreadsheet_body = {
|
||||
"properties": {"title": title},
|
||||
"sheets": [
|
||||
{
|
||||
"properties": {
|
||||
"title": sheet_names[0] if sheet_names else "Sheet1"
|
||||
}
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
result = service.spreadsheets().create(body=spreadsheet_body).execute()
|
||||
spreadsheet_id = result["spreadsheetId"]
|
||||
spreadsheet_url = result["spreadsheetUrl"]
|
||||
|
||||
# Add additional sheets if requested
|
||||
if len(sheet_names) > 1:
|
||||
requests = []
|
||||
for sheet_name in sheet_names[1:]:
|
||||
requests.append({"addSheet": {"properties": {"title": sheet_name}}})
|
||||
|
||||
if requests:
|
||||
batch_body = {"requests": requests}
|
||||
service.spreadsheets().batchUpdate(
|
||||
spreadsheetId=spreadsheet_id, body=batch_body
|
||||
).execute()
|
||||
|
||||
return {
|
||||
"spreadsheetId": spreadsheet_id,
|
||||
"spreadsheetUrl": spreadsheet_url,
|
||||
}
|
||||
except Exception as e:
|
||||
return {"error": str(e)}
|
||||
|
||||
@@ -67,12 +67,11 @@ class AddMemoryBlock(Block, Mem0Base):
|
||||
metadata: dict[str, Any] = SchemaField(
|
||||
description="Optional metadata for the memory", default_factory=dict
|
||||
)
|
||||
|
||||
limit_memory_to_run: bool = SchemaField(
|
||||
description="Limit the memory to the run", default=False
|
||||
)
|
||||
limit_memory_to_agent: bool = SchemaField(
|
||||
description="Limit the memory to the agent", default=False
|
||||
description="Limit the memory to the agent", default=True
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
@@ -104,7 +103,12 @@ class AddMemoryBlock(Block, Mem0Base):
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
],
|
||||
test_output=[("action", "NO_CHANGE"), ("action", "NO_CHANGE")],
|
||||
test_output=[
|
||||
("action", "CREATED"),
|
||||
("memory", "test memory"),
|
||||
("action", "CREATED"),
|
||||
("memory", "test memory"),
|
||||
],
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_mock={"_get_client": lambda credentials: MockMemoryClient()},
|
||||
)
|
||||
@@ -117,7 +121,7 @@ class AddMemoryBlock(Block, Mem0Base):
|
||||
user_id: str,
|
||||
graph_id: str,
|
||||
graph_exec_id: str,
|
||||
**kwargs
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
client = self._get_client(credentials)
|
||||
@@ -178,6 +182,10 @@ class SearchMemoryBlock(Block, Mem0Base):
|
||||
default_factory=list,
|
||||
advanced=True,
|
||||
)
|
||||
metadata_filter: Optional[dict[str, Any]] = SchemaField(
|
||||
description="Optional metadata filters to apply",
|
||||
default=None,
|
||||
)
|
||||
limit_memory_to_run: bool = SchemaField(
|
||||
description="Limit the memory to the run", default=False
|
||||
)
|
||||
@@ -216,7 +224,7 @@ class SearchMemoryBlock(Block, Mem0Base):
|
||||
user_id: str,
|
||||
graph_id: str,
|
||||
graph_exec_id: str,
|
||||
**kwargs
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
client = self._get_client(credentials)
|
||||
@@ -235,6 +243,8 @@ class SearchMemoryBlock(Block, Mem0Base):
|
||||
filters["AND"].append({"run_id": graph_exec_id})
|
||||
if input_data.limit_memory_to_agent:
|
||||
filters["AND"].append({"agent_id": graph_id})
|
||||
if input_data.metadata_filter:
|
||||
filters["AND"].append({"metadata": input_data.metadata_filter})
|
||||
|
||||
result: list[dict[str, Any]] = client.search(
|
||||
input_data.query, version="v2", filters=filters
|
||||
@@ -260,11 +270,15 @@ class GetAllMemoriesBlock(Block, Mem0Base):
|
||||
categories: Optional[list[str]] = SchemaField(
|
||||
description="Filter by categories", default=None
|
||||
)
|
||||
metadata_filter: Optional[dict[str, Any]] = SchemaField(
|
||||
description="Optional metadata filters to apply",
|
||||
default=None,
|
||||
)
|
||||
limit_memory_to_run: bool = SchemaField(
|
||||
description="Limit the memory to the run", default=False
|
||||
)
|
||||
limit_memory_to_agent: bool = SchemaField(
|
||||
description="Limit the memory to the agent", default=False
|
||||
description="Limit the memory to the agent", default=True
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
@@ -274,11 +288,11 @@ class GetAllMemoriesBlock(Block, Mem0Base):
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="45aee5bf-4767-45d1-a28b-e01c5aae9fc1",
|
||||
description="Retrieve all memories from Mem0 with pagination",
|
||||
description="Retrieve all memories from Mem0 with optional conversation filtering",
|
||||
input_schema=GetAllMemoriesBlock.Input,
|
||||
output_schema=GetAllMemoriesBlock.Output,
|
||||
test_input={
|
||||
"user_id": "test_user",
|
||||
"metadata_filter": {"type": "test"},
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_output=[
|
||||
@@ -296,7 +310,7 @@ class GetAllMemoriesBlock(Block, Mem0Base):
|
||||
user_id: str,
|
||||
graph_id: str,
|
||||
graph_exec_id: str,
|
||||
**kwargs
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
client = self._get_client(credentials)
|
||||
@@ -314,6 +328,8 @@ class GetAllMemoriesBlock(Block, Mem0Base):
|
||||
filters["AND"].append(
|
||||
{"categories": {"contains": input_data.categories}}
|
||||
)
|
||||
if input_data.metadata_filter:
|
||||
filters["AND"].append({"metadata": input_data.metadata_filter})
|
||||
|
||||
memories: list[dict[str, Any]] = client.get_all(
|
||||
filters=filters,
|
||||
@@ -326,14 +342,116 @@ class GetAllMemoriesBlock(Block, Mem0Base):
|
||||
yield "error", str(e)
|
||||
|
||||
|
||||
class GetLatestMemoryBlock(Block, Mem0Base):
|
||||
"""Block for retrieving the latest memory from Mem0"""
|
||||
|
||||
class Input(BlockSchema):
|
||||
credentials: CredentialsMetaInput[
|
||||
Literal[ProviderName.MEM0], Literal["api_key"]
|
||||
] = CredentialsField(description="Mem0 API key credentials")
|
||||
trigger: bool = SchemaField(
|
||||
description="An unused field that is used to trigger the block when you have no other inputs",
|
||||
default=False,
|
||||
advanced=False,
|
||||
)
|
||||
categories: Optional[list[str]] = SchemaField(
|
||||
description="Filter by categories", default=None
|
||||
)
|
||||
conversation_id: Optional[str] = SchemaField(
|
||||
description="Optional conversation ID to retrieve the latest memory from (uses run_id)",
|
||||
default=None,
|
||||
)
|
||||
metadata_filter: Optional[dict[str, Any]] = SchemaField(
|
||||
description="Optional metadata filters to apply",
|
||||
default=None,
|
||||
)
|
||||
limit_memory_to_run: bool = SchemaField(
|
||||
description="Limit the memory to the run", default=False
|
||||
)
|
||||
limit_memory_to_agent: bool = SchemaField(
|
||||
description="Limit the memory to the agent", default=True
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
memory: Optional[dict[str, Any]] = SchemaField(
|
||||
description="Latest memory if found"
|
||||
)
|
||||
found: bool = SchemaField(description="Whether a memory was found")
|
||||
error: str = SchemaField(description="Error message if operation fails")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="0f9d81b5-a145-4c23-b87f-01d6bf37b677",
|
||||
description="Retrieve the latest memory from Mem0 with optional key filtering",
|
||||
input_schema=GetLatestMemoryBlock.Input,
|
||||
output_schema=GetLatestMemoryBlock.Output,
|
||||
test_input={
|
||||
"metadata_filter": {"type": "test"},
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
},
|
||||
test_output=[
|
||||
("memory", {"id": "test-memory", "content": "test content"}),
|
||||
("found", True),
|
||||
],
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
test_mock={"_get_client": lambda credentials: MockMemoryClient()},
|
||||
)
|
||||
|
||||
async def run(
|
||||
self,
|
||||
input_data: Input,
|
||||
*,
|
||||
credentials: APIKeyCredentials,
|
||||
user_id: str,
|
||||
graph_id: str,
|
||||
graph_exec_id: str,
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
client = self._get_client(credentials)
|
||||
|
||||
filters: Filter = {
|
||||
"AND": [
|
||||
{"user_id": user_id},
|
||||
]
|
||||
}
|
||||
if input_data.limit_memory_to_run:
|
||||
filters["AND"].append({"run_id": graph_exec_id})
|
||||
if input_data.limit_memory_to_agent:
|
||||
filters["AND"].append({"agent_id": graph_id})
|
||||
if input_data.categories:
|
||||
filters["AND"].append(
|
||||
{"categories": {"contains": input_data.categories}}
|
||||
)
|
||||
if input_data.metadata_filter:
|
||||
filters["AND"].append({"metadata": input_data.metadata_filter})
|
||||
|
||||
memories: list[dict[str, Any]] = client.get_all(
|
||||
filters=filters,
|
||||
version="v2",
|
||||
)
|
||||
|
||||
if memories:
|
||||
# Return the latest memory (first in the list as they're sorted by recency)
|
||||
latest_memory = memories[0]
|
||||
yield "memory", latest_memory
|
||||
yield "found", True
|
||||
else:
|
||||
yield "memory", None
|
||||
yield "found", False
|
||||
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
|
||||
|
||||
# Mock client for testing
|
||||
class MockMemoryClient:
|
||||
"""Mock Mem0 client for testing"""
|
||||
|
||||
def add(self, *args, **kwargs):
|
||||
return {"memory_id": "test-memory-id", "status": "success"}
|
||||
return {"results": [{"event": "CREATED", "memory": "test memory"}]}
|
||||
|
||||
def search(self, *args, **kwargs) -> list[dict[str, str]]:
|
||||
def search(self, *args, **kwargs) -> list[dict[str, Any]]:
|
||||
return [{"id": "test-memory", "content": "test content"}]
|
||||
|
||||
def get_all(self, *args, **kwargs) -> list[dict[str, str]]:
|
||||
|
||||
Reference in New Issue
Block a user