Fix dictionary changed size during iteration error in EventStream (#7984)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Rohit Malhotra
2025-04-21 16:21:30 -04:00
committed by GitHub
parent 52d881c98d
commit 2514b200c5
2 changed files with 78 additions and 6 deletions

View File

@@ -232,11 +232,17 @@ class EventStream(EventStore):
# pass each event to each callback in order
for key in sorted(self._subscribers.keys()):
callbacks = self._subscribers[key]
for callback_id in callbacks:
callback = callbacks[callback_id]
pool = self._thread_pools[key][callback_id]
future = pool.submit(callback, event)
future.add_done_callback(self._make_error_handler(callback_id, key))
# Create a copy of the keys to avoid "dictionary changed size during iteration" error
callback_ids = list(callbacks.keys())
for callback_id in callback_ids:
# Check if callback_id still exists (might have been removed during iteration)
if callback_id in callbacks:
callback = callbacks[callback_id]
pool = self._thread_pools[key][callback_id]
future = pool.submit(callback, event)
future.add_done_callback(
self._make_error_handler(callback_id, key)
)
def _make_error_handler(
self, callback_id: str, subscriber_id: str

View File

@@ -8,7 +8,7 @@ import pytest
from pytest import TempPathFactory
from openhands.core.schema import ActionType, ObservationType
from openhands.events import EventSource, EventStream
from openhands.events import EventSource, EventStream, EventStreamSubscriber
from openhands.events.action import (
NullAction,
)
@@ -444,6 +444,72 @@ def test_cache_page_performance(temp_dir: str):
# In real-world scenarios with many more events, the performance difference would be more significant.
def test_callback_dictionary_modification(temp_dir: str):
"""Test that the event stream can handle dictionary modification during iteration.
This test verifies that the fix for the 'dictionary changed size during iteration' error works.
The test adds a callback that adds a new callback during iteration, which would cause an error
without the fix.
"""
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('callback_test', file_store)
# Track callback execution
callback_executed = [False, False, False]
# Define a callback that will be added during iteration
def callback_added_during_iteration(event):
callback_executed[2] = True
# First callback that will be called
def callback1(event):
callback_executed[0] = True
# This callback will add a new callback during iteration
# Without our fix, this would cause a "dictionary changed size during iteration" error
event_stream.subscribe(
EventStreamSubscriber.TEST, callback_added_during_iteration, 'callback3'
)
# Second callback that will be called
def callback2(event):
callback_executed[1] = True
# Subscribe both callbacks
event_stream.subscribe(EventStreamSubscriber.TEST, callback1, 'callback1')
event_stream.subscribe(EventStreamSubscriber.TEST, callback2, 'callback2')
# Add an event to trigger callbacks
event_stream.add_event(NullObservation('test'), EventSource.AGENT)
# Give some time for the callbacks to execute
time.sleep(0.5)
# Verify that the first two callbacks were executed
assert callback_executed[0] is True, 'First callback should have been executed'
assert callback_executed[1] is True, 'Second callback should have been executed'
# The third callback should not have been executed for this event
# since it was added during iteration
assert (
callback_executed[2] is False
), 'Third callback should not have been executed for this event'
# Add another event to trigger all callbacks including the newly added one
callback_executed = [False, False, False] # Reset execution tracking
event_stream.add_event(NullObservation('test2'), EventSource.AGENT)
# Give some time for the callbacks to execute
time.sleep(0.5)
# Now all three callbacks should have been executed
assert callback_executed[0] is True, 'First callback should have been executed'
assert callback_executed[1] is True, 'Second callback should have been executed'
assert callback_executed[2] is True, 'Third callback should have been executed'
# Clean up
event_stream.close()
def test_cache_page_partial_retrieval(temp_dir: str):
"""Test retrieving events with start_id and end_id parameters using the cache."""
file_store = get_file_store('local', temp_dir)