feat(block): Add LLM prompt as the output pin (#9330)

### Changes 🏗️

To ease the debugging, we can expose the prompt sent to the LLM
provider.

<img width="418" alt="image"
src="https://github.com/user-attachments/assets/0c8d7502-4f87-4002-a498-331f341859bd"
/>


### Checklist 📋

#### For code changes:
- [ ] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [ ] ...

<details>
  <summary>Example test plan</summary>
  
  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>
This commit is contained in:
Zamil Majdy
2025-01-25 19:33:41 +07:00
committed by GitHub
parent 5b7a491a36
commit d74e4ef1a8

View File

@@ -1,5 +1,6 @@
import ast
import logging
from abc import ABC
from enum import Enum, EnumMeta
from json import JSONDecodeError
from types import MappingProxyType
@@ -206,7 +207,17 @@ class Message(BlockSchema):
content: str
class AIStructuredResponseGeneratorBlock(Block):
class AIBlockBase(Block, ABC):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.prompt = ""
def merge_llm_stats(self, block: "AIBlockBase"):
self.merge_stats(block.execution_stats)
self.prompt = block.prompt
class AIStructuredResponseGeneratorBlock(AIBlockBase):
class Input(BlockSchema):
prompt: str = SchemaField(
description="The prompt to send to the language model.",
@@ -258,6 +269,7 @@ class AIStructuredResponseGeneratorBlock(Block):
response: dict[str, Any] = SchemaField(
description="The response object generated by the language model."
)
prompt: str = SchemaField(description="The prompt sent to the language model.")
error: str = SchemaField(description="Error message if the API call failed.")
def __init__(self):
@@ -277,7 +289,10 @@ class AIStructuredResponseGeneratorBlock(Block):
"prompt": "User prompt",
},
test_credentials=TEST_CREDENTIALS,
test_output=("response", {"key1": "key1Value", "key2": "key2Value"}),
test_output=[
("response", {"key1": "key1Value", "key2": "key2Value"}),
("prompt", str),
],
test_mock={
"llm_call": lambda *args, **kwargs: (
json.dumps(
@@ -291,9 +306,10 @@ class AIStructuredResponseGeneratorBlock(Block):
)
},
)
self.prompt = ""
@staticmethod
def llm_call(
self,
credentials: APIKeyCredentials,
llm_model: LlmModel,
prompt: list[dict],
@@ -303,7 +319,7 @@ class AIStructuredResponseGeneratorBlock(Block):
) -> tuple[str, int, int]:
"""
Args:
api_key: API key for the LLM provider.
credentials: The API key credentials to use.
llm_model: The LLM model to use.
prompt: The prompt to send to the LLM.
json_format: Whether the response should be in JSON format.
@@ -337,6 +353,7 @@ class AIStructuredResponseGeneratorBlock(Block):
response_format=response_format, # type: ignore
max_completion_tokens=max_tokens,
)
self.prompt = json.dumps(prompt)
return (
response.choices[0].message.content or "",
@@ -366,6 +383,7 @@ class AIStructuredResponseGeneratorBlock(Block):
messages=messages,
max_tokens=max_tokens or 8192,
)
self.prompt = json.dumps(prompt)
if not resp.content:
raise ValueError("No content returned from Anthropic.")
@@ -392,6 +410,7 @@ class AIStructuredResponseGeneratorBlock(Block):
response_format=response_format, # type: ignore
max_tokens=max_tokens,
)
self.prompt = json.dumps(prompt)
return (
response.choices[0].message.content or "",
response.usage.prompt_tokens if response.usage else 0,
@@ -406,6 +425,7 @@ class AIStructuredResponseGeneratorBlock(Block):
prompt=f"{sys_messages}\n\n{usr_messages}",
stream=False,
)
self.prompt = json.dumps(prompt)
return (
response.get("response") or "",
response.get("prompt_eval_count") or 0,
@@ -426,6 +446,7 @@ class AIStructuredResponseGeneratorBlock(Block):
messages=prompt, # type: ignore
max_tokens=max_tokens,
)
self.prompt = json.dumps(prompt)
# If there's no response, raise an error
if not response.choices:
@@ -525,9 +546,11 @@ class AIStructuredResponseGeneratorBlock(Block):
)
for k, v in parsed_dict.items()
}
yield "prompt", self.prompt
return
else:
yield "response", {"response": response_text}
yield "prompt", self.prompt
return
retry_prompt = trim_prompt(
@@ -558,7 +581,7 @@ class AIStructuredResponseGeneratorBlock(Block):
raise RuntimeError(retry_prompt)
class AITextGeneratorBlock(Block):
class AITextGeneratorBlock(AIBlockBase):
class Input(BlockSchema):
prompt: str = SchemaField(
description="The prompt to send to the language model. You can use any of the {keys} from Prompt Values to fill in the prompt with values from the prompt values dictionary by putting them in curly braces.",
@@ -601,6 +624,7 @@ class AITextGeneratorBlock(Block):
response: str = SchemaField(
description="The response generated by the language model."
)
prompt: str = SchemaField(description="The prompt sent to the language model.")
error: str = SchemaField(description="Error message if the API call failed.")
def __init__(self):
@@ -615,7 +639,10 @@ class AITextGeneratorBlock(Block):
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=("response", "Response text"),
test_output=[
("response", "Response text"),
("prompt", str),
],
test_mock={"llm_call": lambda *args, **kwargs: "Response text"},
)
@@ -626,7 +653,7 @@ class AITextGeneratorBlock(Block):
) -> str:
block = AIStructuredResponseGeneratorBlock()
response = block.run_once(input_data, "response", credentials=credentials)
self.merge_stats(block.execution_stats)
self.merge_llm_stats(block)
return response["response"]
def run(
@@ -637,6 +664,7 @@ class AITextGeneratorBlock(Block):
expected_format={},
)
yield "response", self.llm_call(object_input_data, credentials)
yield "prompt", self.prompt
class SummaryStyle(Enum):
@@ -646,7 +674,7 @@ class SummaryStyle(Enum):
NUMBERED_LIST = "numbered list"
class AITextSummarizerBlock(Block):
class AITextSummarizerBlock(AIBlockBase):
class Input(BlockSchema):
text: str = SchemaField(
description="The text to summarize.",
@@ -689,6 +717,7 @@ class AITextSummarizerBlock(Block):
class Output(BlockSchema):
summary: str = SchemaField(description="The final summary of the text.")
prompt: str = SchemaField(description="The prompt sent to the language model.")
error: str = SchemaField(description="Error message if the API call failed.")
def __init__(self):
@@ -703,7 +732,10 @@ class AITextSummarizerBlock(Block):
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=("summary", "Final summary of a long text"),
test_output=[
("summary", "Final summary of a long text"),
("prompt", str),
],
test_mock={
"llm_call": lambda input_data, credentials: (
{"final_summary": "Final summary of a long text"}
@@ -731,6 +763,7 @@ class AITextSummarizerBlock(Block):
final_summary = self._combine_summaries(summaries, input_data, credentials)
yield "summary", final_summary
yield "prompt", self.prompt
@staticmethod
def _split_text(text: str, max_tokens: int, overlap: int) -> list[str]:
@@ -751,7 +784,7 @@ class AITextSummarizerBlock(Block):
) -> dict:
block = AIStructuredResponseGeneratorBlock()
response = block.run_once(input_data, "response", credentials=credentials)
self.merge_stats(block.execution_stats)
self.merge_llm_stats(block)
return response
def _summarize_chunk(
@@ -808,7 +841,7 @@ class AITextSummarizerBlock(Block):
] # Get the first yielded value
class AIConversationBlock(Block):
class AIConversationBlock(AIBlockBase):
class Input(BlockSchema):
messages: List[Message] = SchemaField(
description="List of messages in the conversation.", min_length=1
@@ -834,6 +867,7 @@ class AIConversationBlock(Block):
response: str = SchemaField(
description="The model's response to the conversation."
)
prompt: str = SchemaField(description="The prompt sent to the language model.")
error: str = SchemaField(description="Error message if the API call failed.")
def __init__(self):
@@ -857,10 +891,13 @@ class AIConversationBlock(Block):
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=(
"response",
"The 2020 World Series was played at Globe Life Field in Arlington, Texas.",
),
test_output=[
(
"response",
"The 2020 World Series was played at Globe Life Field in Arlington, Texas.",
),
("prompt", str),
],
test_mock={
"llm_call": lambda *args, **kwargs: "The 2020 World Series was played at Globe Life Field in Arlington, Texas."
},
@@ -873,7 +910,7 @@ class AIConversationBlock(Block):
) -> str:
block = AIStructuredResponseGeneratorBlock()
response = block.run_once(input_data, "response", credentials=credentials)
self.merge_stats(block.execution_stats)
self.merge_llm_stats(block)
return response["response"]
def run(
@@ -892,9 +929,10 @@ class AIConversationBlock(Block):
)
yield "response", response
yield "prompt", self.prompt
class AIListGeneratorBlock(Block):
class AIListGeneratorBlock(AIBlockBase):
class Input(BlockSchema):
focus: str | None = SchemaField(
description="The focus of the list to generate.",
@@ -937,6 +975,7 @@ class AIListGeneratorBlock(Block):
list_item: str = SchemaField(
description="Each individual item in the list.",
)
prompt: str = SchemaField(description="The prompt sent to the language model.")
error: str = SchemaField(
description="Error message if the list generation failed."
)
@@ -968,6 +1007,7 @@ class AIListGeneratorBlock(Block):
"generated_list",
["Zylora Prime", "Kharon-9", "Vortexia", "Oceara", "Draknos"],
),
("prompt", str),
("list_item", "Zylora Prime"),
("list_item", "Kharon-9"),
("list_item", "Vortexia"),
@@ -981,13 +1021,14 @@ class AIListGeneratorBlock(Block):
},
)
@staticmethod
def llm_call(
self,
input_data: AIStructuredResponseGeneratorBlock.Input,
credentials: APIKeyCredentials,
) -> dict[str, str]:
llm_block = AIStructuredResponseGeneratorBlock()
response = llm_block.run_once(input_data, "response", credentials=credentials)
self.merge_llm_stats(llm_block)
return response
@staticmethod
@@ -1101,6 +1142,7 @@ class AIListGeneratorBlock(Block):
# If we reach here, we have a valid Python list
logger.debug("Successfully generated a valid Python list")
yield "generated_list", parsed_list
yield "prompt", self.prompt
# Yield each item in the list
for item in parsed_list: