Compare commits

...

1 Commits

Author SHA1 Message Date
Xingyao Wang 8391bb432b [Automations Phase 1] Task 3: Scheduler (#13331)
Co-authored-by: openhands <openhands@all-hands.dev>
2026-03-11 17:49:25 +00:00
6 changed files with 783 additions and 0 deletions
+2
View File
@@ -14,6 +14,7 @@ readme = "README.md"
repository = "https://github.com/OpenHands/OpenHands"
packages = [
{ include = "server" },
{ include = "services" },
{ include = "storage" },
{ include = "sync" },
{ include = "integrations" },
@@ -49,6 +50,7 @@ pandas = "^2.2.0"
numpy = "^2.2.0"
mcp = "^1.10.0"
pillow = "^12.1.1"
croniter = "^6.0.0"
[tool.poetry.group.dev.dependencies]
ruff = "0.8.3"
+33
View File
@@ -0,0 +1,33 @@
"""Entry point for the automation scheduler CronJob.
Usage: python -m run_automation_scheduler
This runs as a Kubernetes CronJob (every minute). It evaluates cron schedules
for all enabled automations and inserts events for those that are due.
The process runs, evaluates, inserts events, and exits — it is NOT a
long-running daemon.
"""
import asyncio
import sys
from server.logger import logger
from services.automation_scheduler import run_scheduler
from storage.database import a_session_maker
async def main() -> int:
"""Run the automation scheduler and return an exit code."""
try:
async with a_session_maker() as session:
events_created = await run_scheduler(session)
logger.info('Automation scheduler finished: %d events created', events_created)
return 0
except Exception:
logger.exception('Error running automation scheduler')
return 1
if __name__ == '__main__':
exit_code = asyncio.run(main())
sys.exit(exit_code)
View File
+152
View File
@@ -0,0 +1,152 @@
"""Automation scheduler — evaluates cron schedules and inserts tick events.
This module is the core logic for the automation scheduler CronJob.
It queries all enabled cron automations, determines which are due based on
their cron expressions and last_triggered_at timestamps, and inserts events
into the automation_events inbox table for the executor to process.
The scheduler does NOT create automation_runs — that's the executor's job.
"""
from __future__ import annotations
import logging
from datetime import datetime
from datetime import timezone as tz
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
from croniter import croniter
from services.automation_event_publisher import (
pg_notify_new_event,
publish_automation_event,
)
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from storage.automation import Automation
logger = logging.getLogger(__name__)
async def run_scheduler(session: AsyncSession) -> int:
"""Evaluate all enabled cron automations and insert events for those that are due.
Args:
session: An async SQLAlchemy session (caller manages the transaction).
Returns:
Number of events created.
"""
now = datetime.now(tz.utc)
result = await session.execute(
select(Automation).where(
Automation.enabled == True, # noqa: E712
Automation.trigger_type == 'cron',
)
)
automations = result.scalars().all()
events_created = 0
for automation in automations:
try:
events_created += await _process_automation(session, automation, now)
except Exception:
# Broad catch is intentional: one broken automation must never prevent
# the rest of the scheduler run from completing. The savepoint inside
# _process_automation ensures the outer session is not poisoned.
logger.exception('Error processing automation %s', automation.id)
continue
await session.commit()
logger.info(
'Scheduler run complete: %d events created from %d automations',
events_created,
len(automations),
)
return events_created
async def _process_automation(
session: AsyncSession, automation: Automation, now: datetime
) -> int:
"""Check a single automation and insert an event if it is due.
Returns 1 if an event was created, 0 otherwise.
"""
cron_config = automation.config.get('triggers', {}).get('cron', {})
schedule = cron_config.get('schedule')
timezone_str = cron_config.get('timezone', 'UTC')
if not schedule:
logger.warning('Automation %s has no cron schedule, skipping', automation.id)
return 0
if not croniter.is_valid(schedule):
logger.warning(
'Automation %s has invalid cron expression %r, skipping',
automation.id,
schedule,
)
return 0
try:
tz_info = ZoneInfo(timezone_str)
except (ZoneInfoNotFoundError, KeyError):
logger.warning(
'Automation %s has invalid timezone %r, falling back to UTC',
automation.id,
timezone_str,
)
tz_info = ZoneInfo('UTC')
reference_time = automation.last_triggered_at or automation.created_at
if reference_time.tzinfo is None:
reference_time = reference_time.replace(tzinfo=tz.utc)
# Compute next run from the reference time in the automation's timezone
ref_in_tz = reference_time.astimezone(tz_info)
cron = croniter(schedule, ref_in_tz)
next_run = cron.get_next(datetime)
# Compare in UTC
if next_run.tzinfo is None:
next_run = next_run.replace(tzinfo=tz.utc)
next_run_utc = next_run.astimezone(tz.utc)
if next_run_utc > now:
return 0
# Automation is due — insert an event
automation_id = str(automation.id)
scheduled_minute = now.strftime('%Y-%m-%dT%H:%MZ')
dedup_key = f'cron-{automation_id}-{scheduled_minute}'
try:
async with session.begin_nested():
event = publish_automation_event(
session=session,
source_type='cron',
payload={
'automation_id': automation_id,
'scheduled_time': now.isoformat(),
},
dedup_key=dedup_key,
metadata={'cron_expression': schedule},
)
automation.last_triggered_at = now
pg_notify_new_event(session, event.id)
await session.flush()
logger.info('Created cron event for automation %s', automation_id)
return 1
except IntegrityError:
# Only the nested transaction (savepoint) is rolled back — events
# from previously-processed automations in the same run are preserved.
logger.debug(
'Dedup: event already exists for automation %s at %s',
automation_id,
scheduled_minute,
)
return 0
@@ -0,0 +1,596 @@
"""Unit tests for the automation scheduler.
Tests verify the core scheduler logic: cron evaluation, event creation,
idempotency, timezone handling, and error resilience.
"""
from __future__ import annotations
import sys
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any
from unittest.mock import MagicMock
import pytest
from sqlalchemy import JSON, Boolean, Column, DateTime, String, select
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase
# ---------------------------------------------------------------------------
# Stub models for Task 1 dependencies (Automation + AutomationEvent)
# These will be replaced by real imports once Task 1 is merged.
# ---------------------------------------------------------------------------
class _Base(DeclarativeBase):
pass
class Automation(_Base):
__tablename__ = 'automations'
id = Column(String, primary_key=True, default=lambda: uuid.uuid4().hex)
user_id = Column(String, nullable=False)
org_id = Column(String, nullable=True)
name = Column(String, nullable=False)
enabled = Column(Boolean, nullable=False, default=True)
config = Column(JSON, nullable=False)
trigger_type = Column(String, nullable=False)
file_store_key = Column(String, nullable=False, default='')
last_triggered_at = Column(DateTime(timezone=True), nullable=True)
created_at = Column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)
updated_at = Column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)
class AutomationEvent(_Base):
__tablename__ = 'automation_events'
id = Column(String, primary_key=True, default=lambda: uuid.uuid4().hex)
source_type = Column(String, nullable=False)
payload = Column(JSON, nullable=False)
metadata_ = Column('metadata', JSON, nullable=True)
dedup_key = Column(String, nullable=False, unique=True)
status = Column(String, nullable=False, default='NEW')
created_at = Column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)
# ---------------------------------------------------------------------------
# Mock the Task 1 modules so the scheduler can be imported
# ---------------------------------------------------------------------------
_published_events: list[AutomationEvent] = []
_notified_event_ids: list[Any] = []
def _fake_publish_automation_event(
session: Any,
source_type: str,
payload: dict,
dedup_key: str,
metadata: dict | None = None,
) -> AutomationEvent:
"""Fake publisher that adds an event to the session."""
event = AutomationEvent(
source_type=source_type,
payload=payload,
dedup_key=dedup_key,
metadata_=metadata,
)
session.add(event)
_published_events.append(event)
return event
def _fake_pg_notify(session: Any, event_id: Any) -> None:
_notified_event_ids.append(event_id)
# Register stub modules before importing the scheduler
_mock_storage_automation = MagicMock()
_mock_storage_automation.Automation = Automation
_mock_publisher = MagicMock()
_mock_publisher.publish_automation_event = _fake_publish_automation_event
_mock_publisher.pg_notify_new_event = _fake_pg_notify
sys.modules.setdefault('storage.automation', _mock_storage_automation)
sys.modules.setdefault('services.automation_event_publisher', _mock_publisher)
from services.automation_scheduler import run_scheduler # noqa: E402
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _reset_tracking():
"""Clear tracking lists between tests."""
_published_events.clear()
_notified_event_ids.clear()
@pytest.fixture
async def async_engine(tmp_path):
db_path = tmp_path / 'test_scheduler.db'
engine = create_async_engine(
f'sqlite+aiosqlite:///{db_path}',
connect_args={'check_same_thread': False},
)
async with engine.begin() as conn:
await conn.run_sync(_Base.metadata.create_all)
yield engine
await engine.dispose()
@pytest.fixture
async def async_session_maker(async_engine):
return async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
def _make_automation(
*,
enabled: bool = True,
trigger_type: str = 'cron',
schedule: str = '*/5 * * * *',
timezone_str: str = 'UTC',
last_triggered_at: datetime | None = None,
created_at: datetime | None = None,
) -> Automation:
now = datetime.now(timezone.utc)
return Automation(
id=uuid.uuid4().hex,
user_id='user-1',
name='test automation',
enabled=enabled,
config={
'triggers': {
trigger_type: {
'schedule': schedule,
'timezone': timezone_str,
}
}
if trigger_type == 'cron'
else {}
},
trigger_type=trigger_type,
last_triggered_at=last_triggered_at,
created_at=created_at or now,
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestRunScheduler:
"""Tests for the run_scheduler function."""
@pytest.mark.asyncio
async def test_automation_is_due(self, async_session_maker):
"""An automation whose next fire time has passed should produce one event."""
auto = _make_automation(
schedule='*/5 * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(minutes=10),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 1
assert len(_published_events) == 1
assert _published_events[0].source_type == 'cron'
assert auto.id in _published_events[0].dedup_key
@pytest.mark.asyncio
async def test_automation_not_due(self, async_session_maker):
"""An automation that was just triggered should NOT produce an event."""
auto = _make_automation(
schedule='*/5 * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(seconds=30),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 0
assert len(_published_events) == 0
@pytest.mark.asyncio
async def test_disabled_automations_skipped(self, async_session_maker):
"""Disabled automations must not produce events."""
auto = _make_automation(
enabled=False,
schedule='* * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 0
@pytest.mark.asyncio
async def test_non_cron_automations_skipped(self, async_session_maker):
"""Automations with trigger_type != 'cron' must not be processed."""
auto = _make_automation(
trigger_type='github',
last_triggered_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 0
@pytest.mark.asyncio
async def test_idempotency_same_minute(self, async_session_maker):
"""Running the scheduler twice in the same minute produces exactly one event.
The dedup_key (based on automation_id + minute) causes the second insert
to fail with an IntegrityError, which the scheduler handles gracefully.
"""
auto = _make_automation(
schedule='* * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(minutes=5),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
# First run
async with async_session_maker() as session:
count1 = await run_scheduler(session)
assert count1 == 1
# Second run — the dedup_key collision triggers IntegrityError
# which the scheduler handles by rolling back and continuing.
async with async_session_maker() as session:
count2 = await run_scheduler(session)
# Second run creates 0 new events (dedup catches it via
# last_triggered_at having been updated in the first run).
assert count2 == 0
@pytest.mark.asyncio
async def test_timezone_handling(self, async_session_maker):
"""Timezone-aware schedules should be evaluated correctly."""
auto = _make_automation(
schedule='*/5 * * * *',
timezone_str='America/New_York',
last_triggered_at=datetime.now(timezone.utc) - timedelta(minutes=10),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 1
assert len(_published_events) == 1
@pytest.mark.asyncio
async def test_last_triggered_at_updated(self, async_session_maker):
"""After creating an event, last_triggered_at must be updated."""
old_time = datetime.now(timezone.utc) - timedelta(hours=1)
auto = _make_automation(
schedule='*/5 * * * *',
last_triggered_at=old_time,
)
auto_id = auto.id
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 1
# Re-read from the database to verify update
async with async_session_maker() as session:
result = await session.get(Automation, auto_id)
assert result.last_triggered_at is not None
# SQLite strips tz info; compare naive-to-naive
result_ts = result.last_triggered_at.replace(tzinfo=None)
assert result_ts > old_time.replace(tzinfo=None)
@pytest.mark.asyncio
async def test_invalid_cron_expression(self, async_session_maker):
"""An automation with an invalid cron expression should be skipped, not crash."""
auto = _make_automation(
schedule='not-a-cron',
last_triggered_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 0
assert len(_published_events) == 0
@pytest.mark.asyncio
async def test_missing_cron_schedule(self, async_session_maker):
"""An automation with empty cron config should be skipped."""
auto = Automation(
id=uuid.uuid4().hex,
user_id='user-1',
name='empty cron',
enabled=True,
config={'triggers': {'cron': {}}},
trigger_type='cron',
created_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 0
@pytest.mark.asyncio
async def test_never_triggered_uses_created_at(self, async_session_maker):
"""When last_triggered_at is None, the scheduler falls back to created_at."""
auto = _make_automation(
schedule='*/5 * * * *',
last_triggered_at=None,
created_at=datetime.now(timezone.utc) - timedelta(hours=1),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 1
@pytest.mark.asyncio
async def test_invalid_timezone_falls_back_to_utc(self, async_session_maker):
"""An invalid timezone should fall back to UTC and still work."""
auto = _make_automation(
schedule='*/5 * * * *',
timezone_str='Invalid/Timezone',
last_triggered_at=datetime.now(timezone.utc) - timedelta(minutes=10),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 1
@pytest.mark.asyncio
async def test_multiple_automations(self, async_session_maker):
"""Multiple due automations should each get their own event."""
auto1 = _make_automation(
schedule='*/5 * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(minutes=10),
)
auto2 = _make_automation(
schedule='*/5 * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(minutes=10),
)
auto_not_due = _make_automation(
schedule='*/5 * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(seconds=30),
)
async with async_session_maker() as session:
session.add_all([auto1, auto2, auto_not_due])
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 2
assert len(_published_events) == 2
@pytest.mark.asyncio
async def test_pg_notify_called(self, async_session_maker):
"""pg_notify_new_event should be called for each created event."""
auto = _make_automation(
schedule='*/5 * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(minutes=10),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
await run_scheduler(session)
assert len(_notified_event_ids) == 1
@pytest.mark.asyncio
async def test_dedup_key_format(self, async_session_maker):
"""The dedup_key must follow the format 'cron-{automation_id}-{minute}'."""
auto = _make_automation(
schedule='* * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(minutes=5),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
await run_scheduler(session)
assert len(_published_events) == 1
dedup = _published_events[0].dedup_key
assert dedup.startswith(f'cron-{auto.id}-')
# The minute portion should look like an ISO-ish timestamp ending with Z
minute_part = dedup.split(f'cron-{auto.id}-')[1]
assert minute_part.endswith('Z')
@pytest.mark.asyncio
async def test_no_automations(self, async_session_maker):
"""Running on an empty table should succeed with 0 events."""
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 0
@pytest.mark.asyncio
async def test_event_payload_contains_automation_id(self, async_session_maker):
"""The event payload must include automation_id and scheduled_time."""
auto = _make_automation(
schedule='*/5 * * * *',
last_triggered_at=datetime.now(timezone.utc) - timedelta(minutes=10),
)
async with async_session_maker() as session:
session.add(auto)
await session.commit()
async with async_session_maker() as session:
await run_scheduler(session)
payload = _published_events[0].payload
assert payload['automation_id'] == auto.id
assert 'scheduled_time' in payload
@pytest.mark.asyncio
async def test_integrity_error_dedup_in_same_session(self, async_session_maker):
"""Pre-inserted event with the same dedup_key triggers IntegrityError.
The savepoint should roll back only the duplicate insert; the scheduler
should return 0 without corrupting the session.
"""
now = datetime.now(timezone.utc)
auto = _make_automation(
schedule='* * * * *', # every minute — always due
last_triggered_at=now - timedelta(minutes=5),
)
# Pre-compute the dedup_key the scheduler will generate
scheduled_minute = now.strftime('%Y-%m-%dT%H:%MZ')
dedup_key = f'cron-{auto.id}-{scheduled_minute}'
# Pre-insert an event with the same dedup_key
existing_event = AutomationEvent(
source_type='cron',
payload={'automation_id': auto.id},
dedup_key=dedup_key,
)
async with async_session_maker() as session:
session.add(auto)
session.add(existing_event)
await session.commit()
# Scheduler tries to insert same dedup_key → IntegrityError
async with async_session_maker() as session:
count = await run_scheduler(session)
assert count == 0
# The _fake_publish_automation_event still appended to the tracking list
# before the flush raised IntegrityError; the important thing is the
# database only has the original event.
async with async_session_maker() as session:
result = await session.execute(
select(AutomationEvent).where(
AutomationEvent.dedup_key == dedup_key
)
)
assert len(result.scalars().all()) == 1
@pytest.mark.asyncio
async def test_savepoint_preserves_other_automations_on_dedup(
self, async_session_maker
):
"""When one automation hits IntegrityError, others must not be rolled back.
This verifies the begin_nested() savepoint isolation: automation B's
event should be preserved even though automation A's insert fails.
"""
now = datetime.now(timezone.utc)
auto_a = _make_automation(
schedule='* * * * *',
last_triggered_at=now - timedelta(minutes=5),
)
auto_b = _make_automation(
schedule='* * * * *',
last_triggered_at=now - timedelta(minutes=5),
)
# Pre-insert a conflicting event for automation A only
scheduled_minute = now.strftime('%Y-%m-%dT%H:%MZ')
dedup_key_a = f'cron-{auto_a.id}-{scheduled_minute}'
existing_event = AutomationEvent(
source_type='cron',
payload={'automation_id': auto_a.id},
dedup_key=dedup_key_a,
)
async with async_session_maker() as session:
session.add_all([auto_a, auto_b, existing_event])
await session.commit()
async with async_session_maker() as session:
count = await run_scheduler(session)
# Only automation B should have produced an event
assert count == 1
# Verify automation B's event was committed to the database
dedup_key_b = f'cron-{auto_b.id}-{scheduled_minute}'
async with async_session_maker() as session:
result = await session.execute(
select(AutomationEvent).where(
AutomationEvent.dedup_key == dedup_key_b
)
)
assert len(result.scalars().all()) == 1
# Verify automation B's last_triggered_at was persisted
async with async_session_maker() as session:
result_b = await session.get(Automation, auto_b.id)
assert result_b.last_triggered_at is not None
# Verify automation A's last_triggered_at was NOT updated (savepoint rolled back)
async with async_session_maker() as session:
result_a = await session.get(Automation, auto_a.id)
ts = result_a.last_triggered_at
if ts is not None and ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
assert ts is not None
assert ts < now