Merge commit from fork

* fix(backend): add resource limits to Jinja2 template rendering

Prevent DoS via computational exhaustion in FillTextTemplateBlock by:

- Subclassing SandboxedEnvironment to intercept ** and * operators
  with caps on exponent size (1000) and string repeat length (10K)
- Replacing range() global with a capped version (max 10K items)
- Wrapping template.render() in a ThreadPoolExecutor with a 10s
  timeout to kill runaway expressions

Addresses GHSA-ppw9-h7rv-gwq9 (CWE-400).

* address review: move helpers after TextFormatter, drop ThreadPoolExecutor

- Move _safe_range and _RestrictedEnvironment below TextFormatter
  (helpers after the function that uses them)
- Remove ThreadPoolExecutor timeout wrapper from format_string() —
  it has problematic behavior in async contexts and the static
  interception (operator caps, range limit) already covers the
  known attack vectors

* address review: extend sequence guard, harden format_email, add tests

- Extend * guard to cover list and tuple repetition, not just strings
  (blocks {{ [0] * 999999999 }} and {{ (0,) * 999999999 }})
- Rename MAX_STRING_REPEAT → MAX_SEQUENCE_REPEAT
- Use _RestrictedEnvironment in format_email (defense-in-depth)
- Add tests: list repeat, tuple repeat, negative exponent, nested
  exponentiation (18 tests total)

* add async timeout wrapper at block level

Wrap format_string calls in FillTextTemplateBlock and AgentOutputBlock
with asyncio.wait_for(asyncio.to_thread(...), timeout=10s).

This provides defense-in-depth: if an expression somehow bypasses the
static operator checks, the async timeout will cancel it. Uses
asyncio.to_thread for proper async integration (no event loop blocking)
and asyncio.wait_for for real cancellation on timeout.

* make format_string async with timeout kwarg

Move asyncio.wait_for + asyncio.to_thread into format_string() itself
with a timeout kwarg (default 10s). This way all callers get the
timeout automatically — no wrapper needed at each call site.

- format_string() is now async, callers use await
- format_email() is now async (calls format_string internally)
- Updated all callers: text.py, io.py, llm.py, smart_decision_maker.py,
  email.py, notifications.py
- Tests updated to use asyncio.run()

* use Jinja2 native async rendering instead of to_thread

Switch from asyncio.to_thread(template.render) to Jinja2's native
enable_async=True + template.render_async(). No thread overhead,
proper async integration. asyncio.wait_for timeout still applies.

---------

Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
This commit is contained in:
Otto
2026-03-17 19:24:04 +00:00
committed by GitHub
parent 8b577ae194
commit 6d0e2063ec
8 changed files with 261 additions and 24 deletions

View File

@@ -211,7 +211,7 @@ class AgentOutputBlock(Block):
if input_data.format:
try:
formatter = TextFormatter(autoescape=input_data.escape_html)
yield "output", formatter.format_string(
yield "output", await formatter.format_string(
input_data.format, {input_data.name: input_data.value}
)
except Exception as e:

View File

@@ -1276,8 +1276,10 @@ class AIStructuredResponseGeneratorBlock(AIBlockBase):
values = input_data.prompt_values
if values:
input_data.prompt = fmt.format_string(input_data.prompt, values)
input_data.sys_prompt = fmt.format_string(input_data.sys_prompt, values)
input_data.prompt = await fmt.format_string(input_data.prompt, values)
input_data.sys_prompt = await fmt.format_string(
input_data.sys_prompt, values
)
if input_data.sys_prompt:
prompt.append({"role": "system", "content": input_data.sys_prompt})

View File

@@ -1050,8 +1050,12 @@ class SmartDecisionMakerBlock(Block):
values = input_data.prompt_values
if values:
input_data.prompt = llm.fmt.format_string(input_data.prompt, values)
input_data.sys_prompt = llm.fmt.format_string(input_data.sys_prompt, values)
input_data.prompt = await llm.fmt.format_string(
input_data.prompt, values
)
input_data.sys_prompt = await llm.fmt.format_string(
input_data.sys_prompt, values
)
if input_data.sys_prompt and not any(
p["role"] == "system" and p["content"].startswith(MAIN_OBJECTIVE_PREFIX)

View File

@@ -290,7 +290,9 @@ class FillTextTemplateBlock(Block):
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
formatter = text.TextFormatter(autoescape=input_data.escape_html)
yield "output", formatter.format_string(input_data.format, input_data.values)
yield "output", await formatter.format_string(
input_data.format, input_data.values
)
class CombineTextsBlock(Block):

View File

@@ -46,7 +46,7 @@ class EmailSender:
MAX_EMAIL_CHARS = 5_000_000 # ~5MB buffer
def send_templated(
async def send_templated(
self,
notification: NotificationType,
user_email: str,
@@ -71,7 +71,7 @@ class EmailSender:
template_data = {"notifications": data} if isinstance(data, list) else data
try:
subject, full_message = self.formatter.format_email(
subject, full_message = await self.formatter.format_email(
base_template=template.base_template,
subject_template=template.subject_template,
content_template=template.body_template,

View File

@@ -378,7 +378,7 @@ class NotificationManager(AppService):
continue
logger.info(f"{events=}")
self.email_sender.send_templated(
await self.email_sender.send_templated(
notification=notification_type,
user_email=recipient_email,
data=events,
@@ -600,7 +600,7 @@ class NotificationManager(AppService):
return False
logger.debug(f"Processing notification for admin: {event}")
recipient_email = settings.config.refund_notification_email
self.email_sender.send_templated(event.type, recipient_email, event)
await self.email_sender.send_templated(event.type, recipient_email, event)
return True
except Exception as e:
logger.exception(f"Error processing notification for admin queue: {e}")
@@ -632,7 +632,7 @@ class NotificationManager(AppService):
unsub_link = generate_unsubscribe_link(event.user_id)
self.email_sender.send_templated(
await self.email_sender.send_templated(
notification=event.type,
user_email=recipient_email,
data=event,
@@ -715,7 +715,7 @@ class NotificationManager(AppService):
try:
# Try to render the email to check its size
template = self.email_sender._get_template(event.type)
_, test_message = self.email_sender.formatter.format_email(
_, test_message = await self.email_sender.formatter.format_email(
base_template=template.base_template,
subject_template=template.subject_template,
content_template=template.body_template,
@@ -730,7 +730,7 @@ class NotificationManager(AppService):
f"(size: {len(test_message):,} chars)"
)
self.email_sender.send_templated(
await self.email_sender.send_templated(
notification=event.type,
user_email=recipient_email,
data=chunk,
@@ -975,7 +975,7 @@ class NotificationManager(AppService):
data=summary_data,
)
self.email_sender.send_templated(
await self.email_sender.send_templated(
notification=event.type,
user_email=recipient_email,
data=data,

View File

@@ -1,3 +1,4 @@
import asyncio
import logging
import re
@@ -10,6 +11,12 @@ from markupsafe import Markup
logger = logging.getLogger(__name__)
# Resource limits for template rendering
MAX_EXPONENT = 1000 # Max allowed exponent in ** operations
MAX_RANGE = 10_000 # Max items from range()
MAX_SEQUENCE_REPEAT = 10_000 # Max length from sequence * int
TEMPLATE_RENDER_TIMEOUT = 10 # Seconds before render is killed
def format_filter_for_jinja2(value, format_string=None):
if format_string:
@@ -19,9 +26,14 @@ def format_filter_for_jinja2(value, format_string=None):
class TextFormatter:
def __init__(self, autoescape: bool = True):
self.env = SandboxedEnvironment(loader=BaseLoader(), autoescape=autoescape)
self.env = _RestrictedEnvironment(
loader=BaseLoader(), autoescape=autoescape, enable_async=True
)
self.env.globals.clear()
# Replace range with a safe capped version
self.env.globals["range"] = _safe_range
# Instead of clearing all filters, just remove potentially unsafe ones
unsafe_filters = ["pprint", "tojson", "urlize", "xmlattr"]
for f in unsafe_filters:
@@ -101,15 +113,34 @@ class TextFormatter:
"img": ["src"],
}
def format_string(self, template_str: str, values=None, **kwargs) -> str:
"""Regular template rendering with escaping"""
async def format_string(
self,
template_str: str,
values=None,
*,
timeout: float | None = TEMPLATE_RENDER_TIMEOUT,
**kwargs,
) -> str:
"""Render a Jinja2 template with resource limits.
Uses Jinja2's native async rendering (``render_async``) with
``asyncio.wait_for`` as a defense-in-depth timeout.
"""
try:
template = self.env.from_string(template_str)
return template.render(values or {}, **kwargs)
coro = template.render_async(values or {}, **kwargs)
if timeout is not None:
return await asyncio.wait_for(coro, timeout=timeout)
return await coro
except TimeoutError:
raise ValueError(
f"Template rendering timed out after {timeout}s "
"(expression too complex)"
)
except TemplateError as e:
raise ValueError(e) from e
def format_email(
async def format_email(
self,
subject_template: str,
base_template: str,
@@ -121,7 +152,7 @@ class TextFormatter:
Special handling for email templates where content needs to be rendered as HTML
"""
# First render the content template
content = self.format_string(content_template, data, **kwargs)
content = await self.format_string(content_template, data, **kwargs)
# Clean the HTML + CSS but don't escape it
clean_content = bleach.clean(
@@ -136,17 +167,21 @@ class TextFormatter:
safe_content = Markup(clean_content)
# Render subject
rendered_subject_template = self.format_string(subject_template, data, **kwargs)
rendered_subject_template = await self.format_string(
subject_template, data, **kwargs
)
# Create new env just for HTML template
html_env = SandboxedEnvironment(loader=BaseLoader(), autoescape=True)
# Create restricted env for HTML template (defense-in-depth)
html_env = _RestrictedEnvironment(
loader=BaseLoader(), autoescape=True, enable_async=True
)
html_env.filters["safe"] = lambda x: (
x if isinstance(x, Markup) else Markup(str(x))
)
# Render base template with the safe content
template = html_env.from_string(base_template)
rendered_base_template = template.render(
rendered_base_template = await template.render_async(
data={
"message": safe_content,
"title": rendered_subject_template,
@@ -157,6 +192,72 @@ class TextFormatter:
return rendered_subject_template, rendered_base_template
def _safe_range(*args: int) -> range:
"""range() replacement that caps the number of items to prevent DoS."""
r = range(*args)
if len(r) > MAX_RANGE:
raise OverflowError(
f"range() too large ({len(r)} items, max {MAX_RANGE})"
)
return r
class _RestrictedEnvironment(SandboxedEnvironment):
"""SandboxedEnvironment with computational complexity limits.
Prevents resource-exhaustion attacks such as ``{{ 999999999**999999999 }}``
or ``{{ range(999999999) | list }}`` by intercepting dangerous builtins.
"""
# Tell Jinja2 to route these operators through call_binop()
intercepted_binops = frozenset(["**", "*"])
def call(
__self, # noqa: N805 Jinja2 convention
__context,
__obj,
*args,
**kwargs,
):
# Intercept pow() to cap the exponent
if __obj is pow and len(args) >= 2:
base, exp = args[0], args[1]
if isinstance(exp, (int, float)) and abs(exp) > MAX_EXPONENT:
raise OverflowError(
f"Exponent too large (max {MAX_EXPONENT})"
)
if isinstance(base, (int, float)) and abs(base) > MAX_EXPONENT:
raise OverflowError(
f"Base too large for exponentiation (max {MAX_EXPONENT})"
)
return super().call(__context, __obj, *args, **kwargs)
def call_binop(self, context, operator, left, right):
# Intercept the ** (power) operator
if operator == "**":
if isinstance(right, (int, float)) and abs(right) > MAX_EXPONENT:
raise OverflowError(
f"Exponent too large (max {MAX_EXPONENT})"
)
if isinstance(left, (int, float)) and abs(left) > MAX_EXPONENT:
raise OverflowError(
f"Base too large for exponentiation (max {MAX_EXPONENT})"
)
# Intercept sequence repetition via * (strings, lists, tuples)
if operator == "*":
if isinstance(left, (str, list, tuple)) and isinstance(right, int):
if len(left) * right > MAX_SEQUENCE_REPEAT:
raise OverflowError(
f"Sequence repeat too large (max {MAX_SEQUENCE_REPEAT} items)"
)
if isinstance(right, (str, list, tuple)) and isinstance(left, int):
if len(right) * left > MAX_SEQUENCE_REPEAT:
raise OverflowError(
f"Sequence repeat too large (max {MAX_SEQUENCE_REPEAT} items)"
)
return super().call_binop(context, operator, left, right)
# ---------------------------------------------------------------------------
# CamelCase splitting
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,128 @@
"""Tests for TextFormatter resource-exhaustion protections (GHSA-ppw9-h7rv-gwq9)."""
import asyncio
import pytest
from backend.util.text import TextFormatter
@pytest.fixture
def fmt() -> TextFormatter:
return TextFormatter(autoescape=False)
# --- Normal usage should still work ---
def test_simple_variable_substitution(fmt: TextFormatter):
result = asyncio.run(fmt.format_string("Hello, {{ name }}!", {"name": "Alice"}))
assert result == "Hello, Alice!"
def test_loop(fmt: TextFormatter):
result = asyncio.run(
fmt.format_string(
"{% for item in items %}{{ item }} {% endfor %}",
{"items": ["a", "b", "c"]},
)
)
assert result == "a b c "
def test_conditional(fmt: TextFormatter):
result = asyncio.run(
fmt.format_string(
"{% if x > 5 %}big{% else %}small{% endif %}",
{"x": 10},
)
)
assert result == "big"
def test_safe_range(fmt: TextFormatter):
result = asyncio.run(
fmt.format_string("{% for i in range(5) %}{{ i }}{% endfor %}")
)
assert result == "01234"
def test_small_exponent(fmt: TextFormatter):
result = asyncio.run(fmt.format_string("{{ 2**10 }}"))
assert result == "1024"
def test_safe_string_repeat(fmt: TextFormatter):
result = asyncio.run(fmt.format_string("{{ 'ab' * 3 }}"))
assert result == "ababab"
# --- Resource-exhaustion attacks should be blocked ---
def test_huge_exponent_blocked(fmt: TextFormatter):
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ 999999999**999999999 }}"))
def test_large_base_blocked(fmt: TextFormatter):
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ 999999999**100 }}"))
def test_negative_exponent_blocked(fmt: TextFormatter):
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ 2**-99999 }}"))
def test_huge_range_blocked(fmt: TextFormatter):
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ range(999999999) | list }}"))
def test_large_string_repeat_blocked(fmt: TextFormatter):
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ 'A' * 100000 }}"))
def test_large_list_repeat_blocked(fmt: TextFormatter):
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ [0] * 999999999 }}"))
def test_large_tuple_repeat_blocked(fmt: TextFormatter):
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ (0,) * 999999999 }}"))
def test_pow_function_blocked(fmt: TextFormatter):
"""pow() builtin should also be capped."""
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ pow(2, 999999) }}"))
def test_nested_exponentiation_blocked(fmt: TextFormatter):
"""{{ 2 ** (2 ** 100) }} — inner result exceeds MAX_EXPONENT as base."""
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ 2 ** (2 ** 100) }}"))
# --- Edge cases ---
def test_moderate_range_allowed(fmt: TextFormatter):
"""range(100) should work fine — well under the limit."""
result = asyncio.run(fmt.format_string("{{ range(100) | list | length }}"))
assert result == "100"
def test_boundary_range(fmt: TextFormatter):
"""range(10000) is exactly at the limit — should work."""
result = asyncio.run(fmt.format_string("{{ range(10000) | list | length }}"))
assert result == "10000"
def test_over_boundary_range(fmt: TextFormatter):
"""range(10001) exceeds the limit — should fail."""
with pytest.raises((ValueError, OverflowError)):
asyncio.run(fmt.format_string("{{ range(10001) | list }}"))