mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
1 Commits
feat/lamin
...
fix/slack-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6bb74a4210 |
@@ -319,12 +319,17 @@ class SlackManager(Manager):
|
||||
# Summaries are generated for every messages anyways, we only need to do
|
||||
# this subscription once for the event which kicked off the job.
|
||||
|
||||
# Get the initial message content to track for privacy
|
||||
# (so we only respond to Slack when last message came from Slack)
|
||||
user_instructions, _ = slack_view._get_instructions(self.jinja_env)
|
||||
|
||||
processor = SlackCallbackProcessor(
|
||||
slack_user_id=slack_view.slack_user_id,
|
||||
channel_id=slack_view.channel_id,
|
||||
message_ts=slack_view.message_ts,
|
||||
thread_ts=slack_view.thread_ts,
|
||||
team_id=slack_view.team_id,
|
||||
last_slack_message_content=user_instructions,
|
||||
)
|
||||
|
||||
# Register the callback processor
|
||||
|
||||
@@ -12,6 +12,7 @@ from integrations.utils import (
|
||||
get_user_v1_enabled_setting,
|
||||
)
|
||||
from jinja2 import Environment
|
||||
from server.utils.conversation_callback_utils import update_callback_processor
|
||||
from slack_sdk import WebClient
|
||||
from storage.slack_conversation import SlackConversation
|
||||
from storage.slack_conversation_store import SlackConversationStore
|
||||
@@ -402,6 +403,19 @@ class SlackUpdateExistingConversationView(SlackNewConversationView):
|
||||
raise StartingConvoException('Conversation is still starting')
|
||||
|
||||
instructions, _ = self._get_instructions(jinja)
|
||||
|
||||
# Update the Slack callback processor with the message content so it knows
|
||||
# this message originated from Slack (for privacy: don't respond to Slack
|
||||
# if user later continues conversation via Web UI)
|
||||
def set_slack_message_content(processor):
|
||||
processor.last_slack_message_content = instructions
|
||||
|
||||
update_callback_processor(
|
||||
self.conversation_id,
|
||||
'SlackCallbackProcessor',
|
||||
set_slack_message_content,
|
||||
)
|
||||
|
||||
user_msg = MessageAction(content=instructions)
|
||||
await conversation_manager.send_event_to_conversation(
|
||||
self.conversation_id, event_to_dict(user_msg)
|
||||
|
||||
@@ -31,6 +31,11 @@ class SlackCallbackProcessor(ConversationCallbackProcessor):
|
||||
|
||||
This processor is used to send summaries of conversations to Slack channels
|
||||
when agent state changes occur.
|
||||
|
||||
The processor tracks the content of the last message sent from Slack to ensure
|
||||
responses are only sent back to Slack when the user's last message originated
|
||||
from Slack. This prevents privacy issues where a user who continues a conversation
|
||||
via the Web UI would have their messages echoed back to Slack.
|
||||
"""
|
||||
|
||||
slack_user_id: str
|
||||
@@ -39,6 +44,7 @@ class SlackCallbackProcessor(ConversationCallbackProcessor):
|
||||
thread_ts: str | None
|
||||
team_id: str
|
||||
last_user_msg_id: int | None = None
|
||||
last_slack_message_content: str | None = None
|
||||
|
||||
async def _send_message_to_slack(self, message: str) -> None:
|
||||
"""
|
||||
@@ -139,6 +145,30 @@ class SlackCallbackProcessor(ConversationCallbackProcessor):
|
||||
)
|
||||
return
|
||||
|
||||
# Privacy check: Only respond to Slack if the last message originated from Slack.
|
||||
# If user continued conversation via Web UI, don't send response back to Slack.
|
||||
current_msg_content = last_user_msg[0].content if last_user_msg else None
|
||||
if (
|
||||
self.last_slack_message_content is not None
|
||||
and current_msg_content != self.last_slack_message_content
|
||||
and current_msg_content != summary_instruction
|
||||
):
|
||||
logger.info(
|
||||
'[Slack] Skipping Slack response - last message did not originate from Slack',
|
||||
extra={
|
||||
'conversation_id': conversation_id,
|
||||
'current_msg_content_preview': (
|
||||
current_msg_content[:100] if current_msg_content else None
|
||||
),
|
||||
'last_slack_message_content_preview': (
|
||||
self.last_slack_message_content[:100]
|
||||
if self.last_slack_message_content
|
||||
else None
|
||||
),
|
||||
},
|
||||
)
|
||||
return
|
||||
|
||||
# Update the last user message ID
|
||||
self.last_user_msg_id = current_msg_id
|
||||
|
||||
|
||||
@@ -179,6 +179,60 @@ def register_callback_processor(
|
||||
return callback.id
|
||||
|
||||
|
||||
def update_callback_processor(
|
||||
conversation_id: str,
|
||||
processor_type: str,
|
||||
update_fn: callable,
|
||||
) -> bool:
|
||||
"""
|
||||
Update an active callback processor for a conversation.
|
||||
|
||||
This function finds an active callback of the specified type and applies
|
||||
the update function to its processor. The updated processor is then saved
|
||||
back to the database.
|
||||
|
||||
Args:
|
||||
conversation_id: The conversation ID to update the callback for
|
||||
processor_type: The type of processor to update (e.g., 'SlackCallbackProcessor')
|
||||
update_fn: A function that takes a processor and modifies it in place
|
||||
|
||||
Returns:
|
||||
bool: True if a callback was found and updated, False otherwise
|
||||
"""
|
||||
with session_maker() as session:
|
||||
callbacks = (
|
||||
session.query(ConversationCallback)
|
||||
.filter(
|
||||
ConversationCallback.conversation_id == conversation_id,
|
||||
ConversationCallback.status == CallbackStatus.ACTIVE,
|
||||
ConversationCallback.processor_type.contains(processor_type),
|
||||
)
|
||||
.all()
|
||||
)
|
||||
|
||||
if not callbacks:
|
||||
return False
|
||||
|
||||
for callback in callbacks:
|
||||
try:
|
||||
processor = callback.get_processor()
|
||||
update_fn(processor)
|
||||
callback.set_processor(processor)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
'callback_update_failed',
|
||||
extra={
|
||||
'conversation_id': conversation_id,
|
||||
'callback_id': callback.id,
|
||||
'processor_type': callback.processor_type,
|
||||
'error': str(e),
|
||||
},
|
||||
)
|
||||
|
||||
session.commit()
|
||||
return True
|
||||
|
||||
|
||||
def update_active_working_seconds(
|
||||
event_store: EventStore, conversation_id: str, user_id: str, file_store: FileStore
|
||||
):
|
||||
|
||||
@@ -459,3 +459,140 @@ class TestSlackCallbackProcessor:
|
||||
assert 'message_ts' in callback.processor_json
|
||||
assert 'thread_ts' in callback.processor_json
|
||||
assert 'team_id' in callback.processor_json
|
||||
|
||||
@patch(
|
||||
'server.conversation_callback_processor.slack_callback_processor.get_summary_instruction'
|
||||
)
|
||||
@patch(
|
||||
'server.conversation_callback_processor.slack_callback_processor.get_last_user_msg_from_conversation_manager'
|
||||
)
|
||||
@patch('server.conversation_callback_processor.slack_callback_processor.logger')
|
||||
async def test_call_skips_non_slack_originated_message(
|
||||
self,
|
||||
mock_logger,
|
||||
mock_get_last_user_msg,
|
||||
mock_get_summary_instruction,
|
||||
agent_state_changed_observation,
|
||||
conversation_callback,
|
||||
):
|
||||
"""Test that the callback skips processing when last message didn't originate from Slack.
|
||||
|
||||
This is a privacy feature: if the user starts a conversation via Slack but then
|
||||
continues via the Web UI, responses should NOT be sent back to Slack.
|
||||
"""
|
||||
# Create processor with last_slack_message_content set
|
||||
# (simulating that we know what message was sent from Slack)
|
||||
processor = SlackCallbackProcessor(
|
||||
slack_user_id='test_user',
|
||||
channel_id='test_channel',
|
||||
message_ts='test_message_ts',
|
||||
thread_ts='test_thread_ts',
|
||||
team_id='test_team_id',
|
||||
last_slack_message_content='Message from Slack',
|
||||
)
|
||||
|
||||
# Setup mocks - the last user message is different (came from Web UI)
|
||||
mock_get_summary_instruction.return_value = 'Please summarize this conversation.'
|
||||
mock_last_msg = MagicMock()
|
||||
mock_last_msg.id = 128
|
||||
mock_last_msg.content = 'Message from Web UI' # Different from Slack message
|
||||
mock_get_last_user_msg.return_value = [mock_last_msg]
|
||||
|
||||
# Call the method
|
||||
await processor(
|
||||
callback=conversation_callback,
|
||||
observation=agent_state_changed_observation,
|
||||
)
|
||||
|
||||
# Verify that we returned early - the callback should NOT be updated
|
||||
conversation_callback.set_processor.assert_not_called()
|
||||
|
||||
# Verify the log message was recorded
|
||||
mock_logger.info.assert_any_call(
|
||||
'[Slack] Skipping Slack response - last message did not originate from Slack',
|
||||
extra={
|
||||
'conversation_id': conversation_callback.conversation_id,
|
||||
'current_msg_content_preview': 'Message from Web UI',
|
||||
'last_slack_message_content_preview': 'Message from Slack',
|
||||
},
|
||||
)
|
||||
|
||||
@patch(
|
||||
'server.conversation_callback_processor.slack_callback_processor.get_summary_instruction'
|
||||
)
|
||||
@patch(
|
||||
'server.conversation_callback_processor.slack_callback_processor.conversation_manager'
|
||||
)
|
||||
@patch(
|
||||
'server.conversation_callback_processor.slack_callback_processor.get_last_user_msg_from_conversation_manager'
|
||||
)
|
||||
@patch(
|
||||
'server.conversation_callback_processor.slack_callback_processor.event_to_dict'
|
||||
)
|
||||
async def test_call_processes_slack_originated_message(
|
||||
self,
|
||||
mock_event_to_dict,
|
||||
mock_get_last_user_msg,
|
||||
mock_conversation_manager,
|
||||
mock_get_summary_instruction,
|
||||
agent_state_changed_observation,
|
||||
conversation_callback,
|
||||
):
|
||||
"""Test that the callback processes messages that originated from Slack."""
|
||||
# Create processor with last_slack_message_content set
|
||||
slack_message = 'Message from Slack'
|
||||
processor = SlackCallbackProcessor(
|
||||
slack_user_id='test_user',
|
||||
channel_id='test_channel',
|
||||
message_ts='test_message_ts',
|
||||
thread_ts='test_thread_ts',
|
||||
team_id='test_team_id',
|
||||
last_slack_message_content=slack_message,
|
||||
)
|
||||
|
||||
# Setup mocks - the last user message matches what was sent from Slack
|
||||
mock_get_summary_instruction.return_value = 'Please summarize this conversation.'
|
||||
mock_last_msg = MagicMock()
|
||||
mock_last_msg.id = 129
|
||||
mock_last_msg.content = slack_message # Same as Slack message
|
||||
mock_get_last_user_msg.return_value = [mock_last_msg]
|
||||
mock_conversation_manager.send_event_to_conversation = AsyncMock()
|
||||
mock_event_to_dict.return_value = {
|
||||
'type': 'message_action',
|
||||
'content': 'Please summarize this conversation.',
|
||||
}
|
||||
|
||||
# Call the method
|
||||
await processor(
|
||||
callback=conversation_callback,
|
||||
observation=agent_state_changed_observation,
|
||||
)
|
||||
|
||||
# Verify that processing continued - the summary instruction was sent
|
||||
mock_conversation_manager.send_event_to_conversation.assert_called_once()
|
||||
|
||||
# Verify the callback was updated
|
||||
conversation_callback.set_processor.assert_called_once()
|
||||
|
||||
def test_last_slack_message_content_serialization(self):
|
||||
"""Test that last_slack_message_content is properly serialized/deserialized."""
|
||||
original_processor = SlackCallbackProcessor(
|
||||
slack_user_id='test_user',
|
||||
channel_id='test_channel',
|
||||
message_ts='test_message_ts',
|
||||
thread_ts='test_thread_ts',
|
||||
team_id='test_team_id',
|
||||
last_slack_message_content='Test Slack message content',
|
||||
)
|
||||
|
||||
# Serialize to JSON
|
||||
json_data = original_processor.model_dump_json()
|
||||
|
||||
# Deserialize from JSON
|
||||
deserialized_processor = SlackCallbackProcessor.model_validate_json(json_data)
|
||||
|
||||
# Verify the field is preserved
|
||||
assert (
|
||||
deserialized_processor.last_slack_message_content
|
||||
== original_processor.last_slack_message_content
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user