mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Merge commit from fork
* fix(backend): add resource limits to Jinja2 template rendering
Prevent DoS via computational exhaustion in FillTextTemplateBlock by:
- Subclassing SandboxedEnvironment to intercept ** and * operators
with caps on exponent size (1000) and string repeat length (10K)
- Replacing range() global with a capped version (max 10K items)
- Wrapping template.render() in a ThreadPoolExecutor with a 10s
timeout to kill runaway expressions
Addresses GHSA-ppw9-h7rv-gwq9 (CWE-400).
* address review: move helpers after TextFormatter, drop ThreadPoolExecutor
- Move _safe_range and _RestrictedEnvironment below TextFormatter
(helpers after the function that uses them)
- Remove ThreadPoolExecutor timeout wrapper from format_string() —
it has problematic behavior in async contexts and the static
interception (operator caps, range limit) already covers the
known attack vectors
* address review: extend sequence guard, harden format_email, add tests
- Extend * guard to cover list and tuple repetition, not just strings
(blocks {{ [0] * 999999999 }} and {{ (0,) * 999999999 }})
- Rename MAX_STRING_REPEAT → MAX_SEQUENCE_REPEAT
- Use _RestrictedEnvironment in format_email (defense-in-depth)
- Add tests: list repeat, tuple repeat, negative exponent, nested
exponentiation (18 tests total)
* add async timeout wrapper at block level
Wrap format_string calls in FillTextTemplateBlock and AgentOutputBlock
with asyncio.wait_for(asyncio.to_thread(...), timeout=10s).
This provides defense-in-depth: if an expression somehow bypasses the
static operator checks, the async timeout will cancel it. Uses
asyncio.to_thread for proper async integration (no event loop blocking)
and asyncio.wait_for for real cancellation on timeout.
* make format_string async with timeout kwarg
Move asyncio.wait_for + asyncio.to_thread into format_string() itself
with a timeout kwarg (default 10s). This way all callers get the
timeout automatically — no wrapper needed at each call site.
- format_string() is now async, callers use await
- format_email() is now async (calls format_string internally)
- Updated all callers: text.py, io.py, llm.py, smart_decision_maker.py,
email.py, notifications.py
- Tests updated to use asyncio.run()
* use Jinja2 native async rendering instead of to_thread
Switch from asyncio.to_thread(template.render) to Jinja2's native
enable_async=True + template.render_async(). No thread overhead,
proper async integration. asyncio.wait_for timeout still applies.
---------
Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
This commit is contained in:
@@ -211,7 +211,7 @@ class AgentOutputBlock(Block):
|
|||||||
if input_data.format:
|
if input_data.format:
|
||||||
try:
|
try:
|
||||||
formatter = TextFormatter(autoescape=input_data.escape_html)
|
formatter = TextFormatter(autoescape=input_data.escape_html)
|
||||||
yield "output", formatter.format_string(
|
yield "output", await formatter.format_string(
|
||||||
input_data.format, {input_data.name: input_data.value}
|
input_data.format, {input_data.name: input_data.value}
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -1276,8 +1276,10 @@ class AIStructuredResponseGeneratorBlock(AIBlockBase):
|
|||||||
|
|
||||||
values = input_data.prompt_values
|
values = input_data.prompt_values
|
||||||
if values:
|
if values:
|
||||||
input_data.prompt = fmt.format_string(input_data.prompt, values)
|
input_data.prompt = await fmt.format_string(input_data.prompt, values)
|
||||||
input_data.sys_prompt = fmt.format_string(input_data.sys_prompt, values)
|
input_data.sys_prompt = await fmt.format_string(
|
||||||
|
input_data.sys_prompt, values
|
||||||
|
)
|
||||||
|
|
||||||
if input_data.sys_prompt:
|
if input_data.sys_prompt:
|
||||||
prompt.append({"role": "system", "content": input_data.sys_prompt})
|
prompt.append({"role": "system", "content": input_data.sys_prompt})
|
||||||
|
|||||||
@@ -1050,8 +1050,12 @@ class SmartDecisionMakerBlock(Block):
|
|||||||
|
|
||||||
values = input_data.prompt_values
|
values = input_data.prompt_values
|
||||||
if values:
|
if values:
|
||||||
input_data.prompt = llm.fmt.format_string(input_data.prompt, values)
|
input_data.prompt = await llm.fmt.format_string(
|
||||||
input_data.sys_prompt = llm.fmt.format_string(input_data.sys_prompt, values)
|
input_data.prompt, values
|
||||||
|
)
|
||||||
|
input_data.sys_prompt = await llm.fmt.format_string(
|
||||||
|
input_data.sys_prompt, values
|
||||||
|
)
|
||||||
|
|
||||||
if input_data.sys_prompt and not any(
|
if input_data.sys_prompt and not any(
|
||||||
p["role"] == "system" and p["content"].startswith(MAIN_OBJECTIVE_PREFIX)
|
p["role"] == "system" and p["content"].startswith(MAIN_OBJECTIVE_PREFIX)
|
||||||
|
|||||||
@@ -290,7 +290,9 @@ class FillTextTemplateBlock(Block):
|
|||||||
|
|
||||||
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||||
formatter = text.TextFormatter(autoescape=input_data.escape_html)
|
formatter = text.TextFormatter(autoescape=input_data.escape_html)
|
||||||
yield "output", formatter.format_string(input_data.format, input_data.values)
|
yield "output", await formatter.format_string(
|
||||||
|
input_data.format, input_data.values
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class CombineTextsBlock(Block):
|
class CombineTextsBlock(Block):
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ class EmailSender:
|
|||||||
|
|
||||||
MAX_EMAIL_CHARS = 5_000_000 # ~5MB buffer
|
MAX_EMAIL_CHARS = 5_000_000 # ~5MB buffer
|
||||||
|
|
||||||
def send_templated(
|
async def send_templated(
|
||||||
self,
|
self,
|
||||||
notification: NotificationType,
|
notification: NotificationType,
|
||||||
user_email: str,
|
user_email: str,
|
||||||
@@ -71,7 +71,7 @@ class EmailSender:
|
|||||||
template_data = {"notifications": data} if isinstance(data, list) else data
|
template_data = {"notifications": data} if isinstance(data, list) else data
|
||||||
|
|
||||||
try:
|
try:
|
||||||
subject, full_message = self.formatter.format_email(
|
subject, full_message = await self.formatter.format_email(
|
||||||
base_template=template.base_template,
|
base_template=template.base_template,
|
||||||
subject_template=template.subject_template,
|
subject_template=template.subject_template,
|
||||||
content_template=template.body_template,
|
content_template=template.body_template,
|
||||||
|
|||||||
@@ -378,7 +378,7 @@ class NotificationManager(AppService):
|
|||||||
continue
|
continue
|
||||||
logger.info(f"{events=}")
|
logger.info(f"{events=}")
|
||||||
|
|
||||||
self.email_sender.send_templated(
|
await self.email_sender.send_templated(
|
||||||
notification=notification_type,
|
notification=notification_type,
|
||||||
user_email=recipient_email,
|
user_email=recipient_email,
|
||||||
data=events,
|
data=events,
|
||||||
@@ -600,7 +600,7 @@ class NotificationManager(AppService):
|
|||||||
return False
|
return False
|
||||||
logger.debug(f"Processing notification for admin: {event}")
|
logger.debug(f"Processing notification for admin: {event}")
|
||||||
recipient_email = settings.config.refund_notification_email
|
recipient_email = settings.config.refund_notification_email
|
||||||
self.email_sender.send_templated(event.type, recipient_email, event)
|
await self.email_sender.send_templated(event.type, recipient_email, event)
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"Error processing notification for admin queue: {e}")
|
logger.exception(f"Error processing notification for admin queue: {e}")
|
||||||
@@ -632,7 +632,7 @@ class NotificationManager(AppService):
|
|||||||
|
|
||||||
unsub_link = generate_unsubscribe_link(event.user_id)
|
unsub_link = generate_unsubscribe_link(event.user_id)
|
||||||
|
|
||||||
self.email_sender.send_templated(
|
await self.email_sender.send_templated(
|
||||||
notification=event.type,
|
notification=event.type,
|
||||||
user_email=recipient_email,
|
user_email=recipient_email,
|
||||||
data=event,
|
data=event,
|
||||||
@@ -715,7 +715,7 @@ class NotificationManager(AppService):
|
|||||||
try:
|
try:
|
||||||
# Try to render the email to check its size
|
# Try to render the email to check its size
|
||||||
template = self.email_sender._get_template(event.type)
|
template = self.email_sender._get_template(event.type)
|
||||||
_, test_message = self.email_sender.formatter.format_email(
|
_, test_message = await self.email_sender.formatter.format_email(
|
||||||
base_template=template.base_template,
|
base_template=template.base_template,
|
||||||
subject_template=template.subject_template,
|
subject_template=template.subject_template,
|
||||||
content_template=template.body_template,
|
content_template=template.body_template,
|
||||||
@@ -730,7 +730,7 @@ class NotificationManager(AppService):
|
|||||||
f"(size: {len(test_message):,} chars)"
|
f"(size: {len(test_message):,} chars)"
|
||||||
)
|
)
|
||||||
|
|
||||||
self.email_sender.send_templated(
|
await self.email_sender.send_templated(
|
||||||
notification=event.type,
|
notification=event.type,
|
||||||
user_email=recipient_email,
|
user_email=recipient_email,
|
||||||
data=chunk,
|
data=chunk,
|
||||||
@@ -975,7 +975,7 @@ class NotificationManager(AppService):
|
|||||||
data=summary_data,
|
data=summary_data,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.email_sender.send_templated(
|
await self.email_sender.send_templated(
|
||||||
notification=event.type,
|
notification=event.type,
|
||||||
user_email=recipient_email,
|
user_email=recipient_email,
|
||||||
data=data,
|
data=data,
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
|
||||||
@@ -10,6 +11,12 @@ from markupsafe import Markup
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Resource limits for template rendering
|
||||||
|
MAX_EXPONENT = 1000 # Max allowed exponent in ** operations
|
||||||
|
MAX_RANGE = 10_000 # Max items from range()
|
||||||
|
MAX_SEQUENCE_REPEAT = 10_000 # Max length from sequence * int
|
||||||
|
TEMPLATE_RENDER_TIMEOUT = 10 # Seconds before render is killed
|
||||||
|
|
||||||
|
|
||||||
def format_filter_for_jinja2(value, format_string=None):
|
def format_filter_for_jinja2(value, format_string=None):
|
||||||
if format_string:
|
if format_string:
|
||||||
@@ -19,9 +26,14 @@ def format_filter_for_jinja2(value, format_string=None):
|
|||||||
|
|
||||||
class TextFormatter:
|
class TextFormatter:
|
||||||
def __init__(self, autoescape: bool = True):
|
def __init__(self, autoescape: bool = True):
|
||||||
self.env = SandboxedEnvironment(loader=BaseLoader(), autoescape=autoescape)
|
self.env = _RestrictedEnvironment(
|
||||||
|
loader=BaseLoader(), autoescape=autoescape, enable_async=True
|
||||||
|
)
|
||||||
self.env.globals.clear()
|
self.env.globals.clear()
|
||||||
|
|
||||||
|
# Replace range with a safe capped version
|
||||||
|
self.env.globals["range"] = _safe_range
|
||||||
|
|
||||||
# Instead of clearing all filters, just remove potentially unsafe ones
|
# Instead of clearing all filters, just remove potentially unsafe ones
|
||||||
unsafe_filters = ["pprint", "tojson", "urlize", "xmlattr"]
|
unsafe_filters = ["pprint", "tojson", "urlize", "xmlattr"]
|
||||||
for f in unsafe_filters:
|
for f in unsafe_filters:
|
||||||
@@ -101,15 +113,34 @@ class TextFormatter:
|
|||||||
"img": ["src"],
|
"img": ["src"],
|
||||||
}
|
}
|
||||||
|
|
||||||
def format_string(self, template_str: str, values=None, **kwargs) -> str:
|
async def format_string(
|
||||||
"""Regular template rendering with escaping"""
|
self,
|
||||||
|
template_str: str,
|
||||||
|
values=None,
|
||||||
|
*,
|
||||||
|
timeout: float | None = TEMPLATE_RENDER_TIMEOUT,
|
||||||
|
**kwargs,
|
||||||
|
) -> str:
|
||||||
|
"""Render a Jinja2 template with resource limits.
|
||||||
|
|
||||||
|
Uses Jinja2's native async rendering (``render_async``) with
|
||||||
|
``asyncio.wait_for`` as a defense-in-depth timeout.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
template = self.env.from_string(template_str)
|
template = self.env.from_string(template_str)
|
||||||
return template.render(values or {}, **kwargs)
|
coro = template.render_async(values or {}, **kwargs)
|
||||||
|
if timeout is not None:
|
||||||
|
return await asyncio.wait_for(coro, timeout=timeout)
|
||||||
|
return await coro
|
||||||
|
except TimeoutError:
|
||||||
|
raise ValueError(
|
||||||
|
f"Template rendering timed out after {timeout}s "
|
||||||
|
"(expression too complex)"
|
||||||
|
)
|
||||||
except TemplateError as e:
|
except TemplateError as e:
|
||||||
raise ValueError(e) from e
|
raise ValueError(e) from e
|
||||||
|
|
||||||
def format_email(
|
async def format_email(
|
||||||
self,
|
self,
|
||||||
subject_template: str,
|
subject_template: str,
|
||||||
base_template: str,
|
base_template: str,
|
||||||
@@ -121,7 +152,7 @@ class TextFormatter:
|
|||||||
Special handling for email templates where content needs to be rendered as HTML
|
Special handling for email templates where content needs to be rendered as HTML
|
||||||
"""
|
"""
|
||||||
# First render the content template
|
# First render the content template
|
||||||
content = self.format_string(content_template, data, **kwargs)
|
content = await self.format_string(content_template, data, **kwargs)
|
||||||
|
|
||||||
# Clean the HTML + CSS but don't escape it
|
# Clean the HTML + CSS but don't escape it
|
||||||
clean_content = bleach.clean(
|
clean_content = bleach.clean(
|
||||||
@@ -136,17 +167,21 @@ class TextFormatter:
|
|||||||
safe_content = Markup(clean_content)
|
safe_content = Markup(clean_content)
|
||||||
|
|
||||||
# Render subject
|
# Render subject
|
||||||
rendered_subject_template = self.format_string(subject_template, data, **kwargs)
|
rendered_subject_template = await self.format_string(
|
||||||
|
subject_template, data, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
# Create new env just for HTML template
|
# Create restricted env for HTML template (defense-in-depth)
|
||||||
html_env = SandboxedEnvironment(loader=BaseLoader(), autoescape=True)
|
html_env = _RestrictedEnvironment(
|
||||||
|
loader=BaseLoader(), autoescape=True, enable_async=True
|
||||||
|
)
|
||||||
html_env.filters["safe"] = lambda x: (
|
html_env.filters["safe"] = lambda x: (
|
||||||
x if isinstance(x, Markup) else Markup(str(x))
|
x if isinstance(x, Markup) else Markup(str(x))
|
||||||
)
|
)
|
||||||
|
|
||||||
# Render base template with the safe content
|
# Render base template with the safe content
|
||||||
template = html_env.from_string(base_template)
|
template = html_env.from_string(base_template)
|
||||||
rendered_base_template = template.render(
|
rendered_base_template = await template.render_async(
|
||||||
data={
|
data={
|
||||||
"message": safe_content,
|
"message": safe_content,
|
||||||
"title": rendered_subject_template,
|
"title": rendered_subject_template,
|
||||||
@@ -157,6 +192,72 @@ class TextFormatter:
|
|||||||
return rendered_subject_template, rendered_base_template
|
return rendered_subject_template, rendered_base_template
|
||||||
|
|
||||||
|
|
||||||
|
def _safe_range(*args: int) -> range:
|
||||||
|
"""range() replacement that caps the number of items to prevent DoS."""
|
||||||
|
r = range(*args)
|
||||||
|
if len(r) > MAX_RANGE:
|
||||||
|
raise OverflowError(
|
||||||
|
f"range() too large ({len(r)} items, max {MAX_RANGE})"
|
||||||
|
)
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
class _RestrictedEnvironment(SandboxedEnvironment):
|
||||||
|
"""SandboxedEnvironment with computational complexity limits.
|
||||||
|
|
||||||
|
Prevents resource-exhaustion attacks such as ``{{ 999999999**999999999 }}``
|
||||||
|
or ``{{ range(999999999) | list }}`` by intercepting dangerous builtins.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Tell Jinja2 to route these operators through call_binop()
|
||||||
|
intercepted_binops = frozenset(["**", "*"])
|
||||||
|
|
||||||
|
def call(
|
||||||
|
__self, # noqa: N805 – Jinja2 convention
|
||||||
|
__context,
|
||||||
|
__obj,
|
||||||
|
*args,
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
|
# Intercept pow() to cap the exponent
|
||||||
|
if __obj is pow and len(args) >= 2:
|
||||||
|
base, exp = args[0], args[1]
|
||||||
|
if isinstance(exp, (int, float)) and abs(exp) > MAX_EXPONENT:
|
||||||
|
raise OverflowError(
|
||||||
|
f"Exponent too large (max {MAX_EXPONENT})"
|
||||||
|
)
|
||||||
|
if isinstance(base, (int, float)) and abs(base) > MAX_EXPONENT:
|
||||||
|
raise OverflowError(
|
||||||
|
f"Base too large for exponentiation (max {MAX_EXPONENT})"
|
||||||
|
)
|
||||||
|
return super().call(__context, __obj, *args, **kwargs)
|
||||||
|
|
||||||
|
def call_binop(self, context, operator, left, right):
|
||||||
|
# Intercept the ** (power) operator
|
||||||
|
if operator == "**":
|
||||||
|
if isinstance(right, (int, float)) and abs(right) > MAX_EXPONENT:
|
||||||
|
raise OverflowError(
|
||||||
|
f"Exponent too large (max {MAX_EXPONENT})"
|
||||||
|
)
|
||||||
|
if isinstance(left, (int, float)) and abs(left) > MAX_EXPONENT:
|
||||||
|
raise OverflowError(
|
||||||
|
f"Base too large for exponentiation (max {MAX_EXPONENT})"
|
||||||
|
)
|
||||||
|
# Intercept sequence repetition via * (strings, lists, tuples)
|
||||||
|
if operator == "*":
|
||||||
|
if isinstance(left, (str, list, tuple)) and isinstance(right, int):
|
||||||
|
if len(left) * right > MAX_SEQUENCE_REPEAT:
|
||||||
|
raise OverflowError(
|
||||||
|
f"Sequence repeat too large (max {MAX_SEQUENCE_REPEAT} items)"
|
||||||
|
)
|
||||||
|
if isinstance(right, (str, list, tuple)) and isinstance(left, int):
|
||||||
|
if len(right) * left > MAX_SEQUENCE_REPEAT:
|
||||||
|
raise OverflowError(
|
||||||
|
f"Sequence repeat too large (max {MAX_SEQUENCE_REPEAT} items)"
|
||||||
|
)
|
||||||
|
return super().call_binop(context, operator, left, right)
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# CamelCase splitting
|
# CamelCase splitting
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
128
autogpt_platform/backend/backend/util/text_test.py
Normal file
128
autogpt_platform/backend/backend/util/text_test.py
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
"""Tests for TextFormatter resource-exhaustion protections (GHSA-ppw9-h7rv-gwq9)."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from backend.util.text import TextFormatter
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def fmt() -> TextFormatter:
|
||||||
|
return TextFormatter(autoescape=False)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Normal usage should still work ---
|
||||||
|
|
||||||
|
|
||||||
|
def test_simple_variable_substitution(fmt: TextFormatter):
|
||||||
|
result = asyncio.run(fmt.format_string("Hello, {{ name }}!", {"name": "Alice"}))
|
||||||
|
assert result == "Hello, Alice!"
|
||||||
|
|
||||||
|
|
||||||
|
def test_loop(fmt: TextFormatter):
|
||||||
|
result = asyncio.run(
|
||||||
|
fmt.format_string(
|
||||||
|
"{% for item in items %}{{ item }} {% endfor %}",
|
||||||
|
{"items": ["a", "b", "c"]},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert result == "a b c "
|
||||||
|
|
||||||
|
|
||||||
|
def test_conditional(fmt: TextFormatter):
|
||||||
|
result = asyncio.run(
|
||||||
|
fmt.format_string(
|
||||||
|
"{% if x > 5 %}big{% else %}small{% endif %}",
|
||||||
|
{"x": 10},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert result == "big"
|
||||||
|
|
||||||
|
|
||||||
|
def test_safe_range(fmt: TextFormatter):
|
||||||
|
result = asyncio.run(
|
||||||
|
fmt.format_string("{% for i in range(5) %}{{ i }}{% endfor %}")
|
||||||
|
)
|
||||||
|
assert result == "01234"
|
||||||
|
|
||||||
|
|
||||||
|
def test_small_exponent(fmt: TextFormatter):
|
||||||
|
result = asyncio.run(fmt.format_string("{{ 2**10 }}"))
|
||||||
|
assert result == "1024"
|
||||||
|
|
||||||
|
|
||||||
|
def test_safe_string_repeat(fmt: TextFormatter):
|
||||||
|
result = asyncio.run(fmt.format_string("{{ 'ab' * 3 }}"))
|
||||||
|
assert result == "ababab"
|
||||||
|
|
||||||
|
|
||||||
|
# --- Resource-exhaustion attacks should be blocked ---
|
||||||
|
|
||||||
|
|
||||||
|
def test_huge_exponent_blocked(fmt: TextFormatter):
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ 999999999**999999999 }}"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_large_base_blocked(fmt: TextFormatter):
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ 999999999**100 }}"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_negative_exponent_blocked(fmt: TextFormatter):
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ 2**-99999 }}"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_huge_range_blocked(fmt: TextFormatter):
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ range(999999999) | list }}"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_large_string_repeat_blocked(fmt: TextFormatter):
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ 'A' * 100000 }}"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_large_list_repeat_blocked(fmt: TextFormatter):
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ [0] * 999999999 }}"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_large_tuple_repeat_blocked(fmt: TextFormatter):
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ (0,) * 999999999 }}"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_pow_function_blocked(fmt: TextFormatter):
|
||||||
|
"""pow() builtin should also be capped."""
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ pow(2, 999999) }}"))
|
||||||
|
|
||||||
|
|
||||||
|
def test_nested_exponentiation_blocked(fmt: TextFormatter):
|
||||||
|
"""{{ 2 ** (2 ** 100) }} — inner result exceeds MAX_EXPONENT as base."""
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ 2 ** (2 ** 100) }}"))
|
||||||
|
|
||||||
|
|
||||||
|
# --- Edge cases ---
|
||||||
|
|
||||||
|
|
||||||
|
def test_moderate_range_allowed(fmt: TextFormatter):
|
||||||
|
"""range(100) should work fine — well under the limit."""
|
||||||
|
result = asyncio.run(fmt.format_string("{{ range(100) | list | length }}"))
|
||||||
|
assert result == "100"
|
||||||
|
|
||||||
|
|
||||||
|
def test_boundary_range(fmt: TextFormatter):
|
||||||
|
"""range(10000) is exactly at the limit — should work."""
|
||||||
|
result = asyncio.run(fmt.format_string("{{ range(10000) | list | length }}"))
|
||||||
|
assert result == "10000"
|
||||||
|
|
||||||
|
|
||||||
|
def test_over_boundary_range(fmt: TextFormatter):
|
||||||
|
"""range(10001) exceeds the limit — should fail."""
|
||||||
|
with pytest.raises((ValueError, OverflowError)):
|
||||||
|
asyncio.run(fmt.format_string("{{ range(10001) | list }}"))
|
||||||
Reference in New Issue
Block a user