feat(block): Added best-effort support of multiple/parallel tool calls for SmartDecisionMaker Block

This commit is contained in:
Zamil Majdy
2025-07-04 10:19:09 -07:00
parent 149bbd910a
commit 171deea806
2 changed files with 105 additions and 66 deletions

View File

@@ -273,6 +273,7 @@ class LLMResponse(BaseModel):
tool_calls: Optional[List[ToolContentBlock]] | None
prompt_tokens: int
completion_tokens: int
reasoning: Optional[str] = None
def convert_openai_tool_fmt_to_anthropic(
@@ -307,6 +308,46 @@ def convert_openai_tool_fmt_to_anthropic(
return anthropic_tools
def extract_openai_reasoning(response) -> str | None:
"""Extract reasoning from OpenAI-compatible response if available."""
"""Note: This will likely not working since the reasoning is not present in another Response API"""
reasoning = None
choice = response.choices[0]
if hasattr(choice, "reasoning") and getattr(choice, "reasoning", None):
reasoning = str(getattr(choice, "reasoning"))
elif hasattr(response, "reasoning") and getattr(response, "reasoning", None):
reasoning = str(getattr(response, "reasoning"))
elif hasattr(choice.message, "reasoning") and getattr(
choice.message, "reasoning", None
):
reasoning = str(getattr(choice.message, "reasoning"))
return reasoning
def extract_openai_tool_calls(response) -> list[ToolContentBlock] | None:
"""Extract tool calls from OpenAI-compatible response."""
if response.choices[0].message.tool_calls:
return [
ToolContentBlock(
id=tool.id,
type=tool.type,
function=ToolCall(
name=tool.function.name,
arguments=tool.function.arguments,
),
)
for tool in response.choices[0].message.tool_calls
]
return None
def get_parallel_tool_calls_param(llm_model: LlmModel, parallel_tool_calls):
"""Get the appropriate parallel_tool_calls parameter for OpenAI-compatible APIs."""
if llm_model.startswith("o") or parallel_tool_calls is None:
return openai.NOT_GIVEN
return parallel_tool_calls
async def llm_call(
credentials: APIKeyCredentials,
llm_model: LlmModel,
@@ -360,8 +401,9 @@ async def llm_call(
oai_client = openai.AsyncOpenAI(api_key=credentials.api_key.get_secret_value())
response_format = None
if llm_model.startswith("o") or parallel_tool_calls is None:
parallel_tool_calls = openai.NOT_GIVEN
parallel_tool_calls = get_parallel_tool_calls_param(
llm_model, parallel_tool_calls
)
if json_format:
response_format = {"type": "json_object"}
@@ -375,20 +417,8 @@ async def llm_call(
parallel_tool_calls=parallel_tool_calls,
)
if response.choices[0].message.tool_calls:
tool_calls = [
ToolContentBlock(
id=tool.id,
type=tool.type,
function=ToolCall(
name=tool.function.name,
arguments=tool.function.arguments,
),
)
for tool in response.choices[0].message.tool_calls
]
else:
tool_calls = None
tool_calls = extract_openai_tool_calls(response)
reasoning = extract_openai_reasoning(response)
return LLMResponse(
raw_response=response.choices[0].message,
@@ -397,6 +427,7 @@ async def llm_call(
tool_calls=tool_calls,
prompt_tokens=response.usage.prompt_tokens if response.usage else 0,
completion_tokens=response.usage.completion_tokens if response.usage else 0,
reasoning=reasoning,
)
elif provider == "anthropic":
@@ -458,6 +489,12 @@ async def llm_call(
f"Tool use stop reason but no tool calls found in content. {resp}"
)
reasoning = None
for content_block in resp.content:
if hasattr(content_block, "type") and content_block.type == "thinking":
reasoning = content_block.thinking
break
return LLMResponse(
raw_response=resp,
prompt=prompt,
@@ -469,6 +506,7 @@ async def llm_call(
tool_calls=tool_calls,
prompt_tokens=resp.usage.input_tokens,
completion_tokens=resp.usage.output_tokens,
reasoning=reasoning,
)
except anthropic.APIError as e:
error_message = f"Anthropic API error: {str(e)}"
@@ -493,6 +531,7 @@ async def llm_call(
tool_calls=None,
prompt_tokens=response.usage.prompt_tokens if response.usage else 0,
completion_tokens=response.usage.completion_tokens if response.usage else 0,
reasoning=None,
)
elif provider == "ollama":
if tools:
@@ -514,6 +553,7 @@ async def llm_call(
tool_calls=None,
prompt_tokens=response.get("prompt_eval_count") or 0,
completion_tokens=response.get("eval_count") or 0,
reasoning=None,
)
elif provider == "open_router":
tools_param = tools if tools else openai.NOT_GIVEN
@@ -522,6 +562,10 @@ async def llm_call(
api_key=credentials.api_key.get_secret_value(),
)
parallel_tool_calls_param = get_parallel_tool_calls_param(
llm_model, parallel_tool_calls
)
response = await client.chat.completions.create(
extra_headers={
"HTTP-Referer": "https://agpt.co",
@@ -531,6 +575,7 @@ async def llm_call(
messages=prompt, # type: ignore
max_tokens=max_tokens,
tools=tools_param, # type: ignore
parallel_tool_calls=parallel_tool_calls_param,
)
# If there's no response, raise an error
@@ -540,19 +585,8 @@ async def llm_call(
else:
raise ValueError("No response from OpenRouter.")
if response.choices[0].message.tool_calls:
tool_calls = [
ToolContentBlock(
id=tool.id,
type=tool.type,
function=ToolCall(
name=tool.function.name, arguments=tool.function.arguments
),
)
for tool in response.choices[0].message.tool_calls
]
else:
tool_calls = None
tool_calls = extract_openai_tool_calls(response)
reasoning = extract_openai_reasoning(response)
return LLMResponse(
raw_response=response.choices[0].message,
@@ -561,6 +595,7 @@ async def llm_call(
tool_calls=tool_calls,
prompt_tokens=response.usage.prompt_tokens if response.usage else 0,
completion_tokens=response.usage.completion_tokens if response.usage else 0,
reasoning=reasoning,
)
elif provider == "llama_api":
tools_param = tools if tools else openai.NOT_GIVEN
@@ -569,6 +604,10 @@ async def llm_call(
api_key=credentials.api_key.get_secret_value(),
)
parallel_tool_calls_param = get_parallel_tool_calls_param(
llm_model, parallel_tool_calls
)
response = await client.chat.completions.create(
extra_headers={
"HTTP-Referer": "https://agpt.co",
@@ -578,9 +617,7 @@ async def llm_call(
messages=prompt, # type: ignore
max_tokens=max_tokens,
tools=tools_param, # type: ignore
parallel_tool_calls=(
openai.NOT_GIVEN if parallel_tool_calls is None else parallel_tool_calls
),
parallel_tool_calls=parallel_tool_calls_param,
)
# If there's no response, raise an error
@@ -590,19 +627,8 @@ async def llm_call(
else:
raise ValueError("No response from Llama API.")
if response.choices[0].message.tool_calls:
tool_calls = [
ToolContentBlock(
id=tool.id,
type=tool.type,
function=ToolCall(
name=tool.function.name, arguments=tool.function.arguments
),
)
for tool in response.choices[0].message.tool_calls
]
else:
tool_calls = None
tool_calls = extract_openai_tool_calls(response)
reasoning = extract_openai_reasoning(response)
return LLMResponse(
raw_response=response.choices[0].message,
@@ -611,6 +637,7 @@ async def llm_call(
tool_calls=tool_calls,
prompt_tokens=response.usage.prompt_tokens if response.usage else 0,
completion_tokens=response.usage.completion_tokens if response.usage else 0,
reasoning=reasoning,
)
elif provider == "aiml_api":
client = openai.OpenAI(
@@ -634,6 +661,7 @@ async def llm_call(
completion_tokens=(
completion.usage.completion_tokens if completion.usage else 0
),
reasoning=None,
)
else:
raise ValueError(f"Unsupported LLM provider: {provider}")
@@ -747,6 +775,7 @@ class AIStructuredResponseGeneratorBlock(AIBlockBase):
tool_calls=None,
prompt_tokens=0,
completion_tokens=0,
reasoning=None,
)
},
)

View File

@@ -452,28 +452,33 @@ class SmartDecisionMakerBlock(Block):
if pending_tool_calls and input_data.last_tool_output is None:
raise ValueError(f"Tool call requires an output for {pending_tool_calls}")
# Prefill all missing tool calls with the last tool output/
# TODO: we need a better way to handle this.
tool_output = [
_create_tool_response(pending_call_id, input_data.last_tool_output)
for pending_call_id, count in pending_tool_calls.items()
for _ in range(count)
]
# If the SDM block only calls 1 tool at a time, this should not happen.
if len(tool_output) > 1:
logger.warning(
f"[SmartDecisionMakerBlock-node_exec_id={node_exec_id}] "
f"Multiple pending tool calls are prefilled using a single output. "
f"Execution may not be accurate."
# Only assign the last tool output to the first pending tool call
tool_output = []
if pending_tool_calls and input_data.last_tool_output is not None:
# Get the first pending tool call ID
first_call_id = next(iter(pending_tool_calls.keys()))
tool_output.append(
_create_tool_response(first_call_id, input_data.last_tool_output)
)
# Add tool output to prompt right away
prompt.extend(tool_output)
# Check if there are still pending tool calls after handling the first one
remaining_pending_calls = get_pending_tool_calls(prompt)
# If there are still pending tool calls, yield the conversation and return early
if remaining_pending_calls:
yield "conversations", prompt
return
# Fallback on adding tool output in the conversation history as user prompt.
if len(tool_output) == 0 and input_data.last_tool_output:
logger.warning(
elif input_data.last_tool_output:
logger.error(
f"[SmartDecisionMakerBlock-node_exec_id={node_exec_id}] "
f"No pending tool calls found. This may indicate an issue with the "
f"conversation history, or an LLM calling two tools at the same time."
f"conversation history, or the tool giving response more than once."
f"This should not happen! Please check the conversation history for any inconsistencies."
)
tool_output.append(
{
@@ -481,8 +486,7 @@ class SmartDecisionMakerBlock(Block):
"content": f"Last tool output: {json.dumps(input_data.last_tool_output)}",
}
)
prompt.extend(tool_output)
prompt.extend(tool_output)
if input_data.multiple_tool_calls:
input_data.sys_prompt += "\nYou can call a tool (different tools) multiple times in a single response."
else:
@@ -550,5 +554,11 @@ class SmartDecisionMakerBlock(Block):
else:
yield f"tools_^_{tool_name}_~_{arg_name}", None
response.prompt.append(response.raw_response)
yield "conversations", response.prompt
# Add reasoning to conversation history if available
if response.reasoning:
prompt.append(
{"role": "assistant", "content": f"[Reasoning]: {response.reasoning}"}
)
prompt.append(response.raw_response)
yield "conversations", prompt