Update prototype.py

This commit is contained in:
Jack Gerrits
2024-05-08 23:52:18 -04:00
committed by GitHub
parent 0c08ca30a1
commit 30e06cee10

View File

@@ -1,156 +1,252 @@
from dataclasses import dataclass
from typing import Awaitable, Callable, List, Mapping, Protocol, Sequence
from typing import Any, Awaitable, Callable, Dict, List, Optional, Protocol, Sequence, Type, cast
import asyncio
from enum import Enum
# Type based routing for events
from abc import ABC, abstractmethod
class EventKind(Enum):
INPUT = "input"
GROUP_CHAT = "GROUP_CHAT"
import random
@dataclass
class Event:
recipient: str
payload: str
kind: EventKind
# Type based routing for event
# -metadata
# Receipt
# Request response
# Type/Kind
# DELIVERY RECEIPTS
# on event
# on event with receipt
class Event(Protocol):
sender: str
request_reply: bool = False
class EventBasedAgent(Protocol):
# T must encompass all subscribed types for a given agent
class Agent(Protocol):
@property
def name(self) -> str:
...
async def on_event(self, event: Event) -> None:
class EventBasedAgent[T: Event](Agent):
@property
def subscriptions(self) -> Sequence[Type[T]]:
...
async def on_event(self, event: T) -> None:
...
class EventRoutingAgent(EventBasedAgent):
# NOTE: this works on concrete types and not inheritance
def event_handler[T: Event](target_type: Type[T]):
def decorator(func: Callable[..., Awaitable[None]]) -> Callable[..., Awaitable[None]]:
func._target_type = target_type # type: ignore
return func
return decorator
def __init__(self, name: str, emit_event: Callable[[Event], None], handlers: Mapping[EventKind, Callable[[Event], Awaitable[None]]]) -> None:
class TypeRoutedAgent[T: Event](EventBasedAgent[T], ABC):
def __init__(self, name: str, emit_event: Callable[[T], Awaitable[None]]) -> None:
self._name = name
self._handlers: Dict[Type[Any], Callable[[T], Awaitable[None]]] = {}
self._emit_event = emit_event
self._emit_event = emit_event
self._handlers = handlers
for attr in dir(self):
if callable(getattr(self, attr)):
handler = getattr(self, attr)
if hasattr(handler, "_target_type"):
# TODO do i need to partially apply self?
self._handlers[handler._target_type] = handler
@property
def name(self) -> str:
return self._name
async def on_event(self, event: Event) -> None:
handler = self._handlers.get(event.kind)
@property
def subscriptions(self) -> Sequence[Type[T]]:
return list(self._handlers.keys())
async def emit_event(self, event: T) -> None:
await self._emit_event(event)
async def on_event(self, event: T) -> None:
handler = self._handlers.get(type(event))
if handler is not None:
await handler(event)
else:
print(f"No handler for event {event}")
await self.on_unhandled_event(event)
class CapitilizationAgent(EventBasedAgent):
def __init__(self, name: str, emit_event: Callable[[Event], None]) -> None:
self._name = name
self._emit_event = emit_event
@property
def name(self) -> str:
return self._name
@abstractmethod
async def on_unhandled_event(self, event: T) -> None:
...
async def on_event(self, event: Event) -> None:
payload = event.payload
capitalized_payload = payload.upper()
self._emit_event(Event(recipient=event.sender, payload=capitalized_payload, kind=event.kind, sender=self._name))
@dataclass
class InputEvent(Event):
message: str
sender: str
class GroupChatManager(EventRoutingAgent):
def __init__(self, name: str, emit_event: Callable[[Event], None], agents: Sequence[EventBasedAgent]) -> None:
@dataclass
class NewEvent(Event):
message: str
sender: str
recipient: str
super().__init__(name, emit_event, {
EventKind.INPUT: self.on_input_event,
EventKind.GROUP_CHAT: self.on_group_chat_event
})
@dataclass
class ResponseEvent(Event):
message: Optional[str]
sender: str
GroupChatEvents = InputEvent | NewEvent | ResponseEvent
class GroupChatManager(TypeRoutedAgent[GroupChatEvents]):
def __init__(self, name: str, emit_event: Callable[[GroupChatEvents], Awaitable[None]], agents: Sequence[Agent]) -> None:
super().__init__(name, emit_event)
self._agents = agents
self._current_speaker = 0
self._events: List[Event] = []
self._events: List[GroupChatEvents] = []
self._responses: List[ResponseEvent] = []
@property
def name(self) -> str:
return self._name
def broadcast(self, event: Event, next_speaker: EventBasedAgent) -> None:
for agent in self._agents:
event.recipient = agent.name
event.request_reply = agent == next_speaker
self._emit_event(event)
async def on_input_event(self, event: Event) -> None:
@event_handler(InputEvent)
async def on_input_event(self, event: InputEvent) -> None:
# New group chat
self._events.clear()
recipient_agent = self._agents[self._current_speaker]
self._current_speaker = (self._current_speaker + 1) % len(self._agents)
event.sender = self._name
new_event = NewEvent(message=event.message, sender=self.name, recipient=recipient_agent.name)
self._events.append(event)
self.broadcast(event, recipient_agent)
await self.emit_event(new_event)
async def on_group_chat_event(self, event: Event) -> None:
# Handle termination and replying to original sender
recipient_agent = self._agents[self._current_speaker]
self._current_speaker = (self._current_speaker + 1) % len(self._agents)
event.sender = self._name
self._events.append(event)
self.broadcast(event, recipient_agent)
@event_handler(ResponseEvent)
async def on_group_chat_event(self, event: ResponseEvent) -> None:
self._responses.append(event)
# TODO: Handle termination and replying to original sender
# Received response from all - proceeed
if len(self._responses) == len(self._agents):
recipient_agent = self._agents[self._current_speaker]
self._current_speaker = (self._current_speaker + 1) % len(self._agents)
responses_with_content = [x for x in self._responses if x.message is not None]
if len(responses_with_content) != 1:
raise ValueError("Can't handle anything other than 1 response right now.")
new_event = NewEvent(message=cast(str, responses_with_content[0].message), sender=self.name, recipient=recipient_agent.name)
self._events.append(new_event)
self._responses.clear()
await self.emit_event(new_event)
async def on_unhandled_event(self, event: GroupChatEvents) -> None:
raise ValueError("Unknown")
class Critic(TypeRoutedAgent[GroupChatEvents]):
def __init__(self, name: str, emit_event: Callable[[GroupChatEvents], Awaitable[None]]) -> None:
super().__init__(name, emit_event)
@event_handler(NewEvent)
async def on_new_event(self, event: NewEvent) -> None:
if event.recipient == self.name:
response = random.choice([" is a good idea", " is a bad idea"])
await self.emit_event(ResponseEvent(event.message + response, sender=self.name))
else:
await self.emit_event(ResponseEvent(None, sender=self.name))
class EventQueue:
async def on_unhandled_event(self, event: GroupChatEvents) -> None:
raise ValueError("Unknown")
class Suggester(TypeRoutedAgent[GroupChatEvents]):
def __init__(self, name: str, emit_event: Callable[[GroupChatEvents], Awaitable[None]]) -> None:
super().__init__(name, emit_event)
@event_handler(NewEvent)
async def on_new_event(self, event: NewEvent) -> None:
if event.recipient == self.name:
response = random.choice(["Attach wheels to a laptop", "merge a banana and an apple", "Cheese but made with oats"])
await self.emit_event(ResponseEvent(response, sender=self.name))
else:
await self.emit_event(ResponseEvent(None, sender=self.name))
async def on_unhandled_event(self, event: GroupChatEvents) -> None:
raise ValueError("Unknown")
class EventQueue[U]:
def __init__(self) -> None:
self._queue: List[Event] = []
self._queue: List[U] = []
def emit(self, event: Event) -> None:
async def emit(self, event: U) -> None:
print(event)
self._queue.append(event)
def pop_event(self) -> Event:
def pop_event(self) -> U:
return self._queue.pop(0)
def empty(self) -> bool:
return len(self._queue) == 0
def into_callable(self) -> Callable[[Event], None]:
def into_callable(self) -> Callable[[U], Awaitable[None]]:
return self.emit
class EventQueueProcessor():
def __init__(self, event_queue: EventQueue, agents: Sequence[EventBasedAgent]) -> None:
class EventRouter[T: Event]():
def __init__(self, event_queue: EventQueue[T], agents: Sequence[EventBasedAgent[T]]) -> None:
self._event_queue = event_queue
self._agent_map = {agent.name: agent for agent in agents}
# Use default dict i just cant remember the syntax and im without internet
self._per_type_subscribers: Dict[Type[T], List[EventBasedAgent[T]]] = {}
for agent in agents:
subscriptions = agent.subscriptions
for subscription in subscriptions:
if subscription not in self._per_type_subscribers:
self._per_type_subscribers[subscription] = []
self._per_type_subscribers[subscription].append(agent)
async def process_next(self) -> None:
if self._event_queue.empty():
return
event = self._event_queue.pop_event()
recipient = event.recipient
if recipient in self._agent_map:
agent = self._agent_map[recipient]
await agent.on_event(event)
subscribers = self._per_type_subscribers.get(type(event))
if subscribers is not None:
for subscriber in subscribers:
await subscriber.on_event(event)
else:
print(f"Event {event} has no recipient agent")
async def main():
event_queue = EventQueue()
agents = [
CapitilizationAgent("CapitilizationAgent", event_queue.into_callable())
]
processor = EventQueueProcessor(event_queue, agents)
event_queue = EventQueue[GroupChatEvents]()
event_queue.emit(Event(recipient="CapitilizationAgent", payload="hello", kind=EventKind.INPUT, sender="main"))
critic = Critic("Critic", event_queue.into_callable())
suggester = Suggester("Suggester", event_queue.into_callable())
group_chat_manager = GroupChatManager("Manager", event_queue.into_callable(), [critic, suggester])
processor = EventRouter(event_queue, [critic, suggester, group_chat_manager])
await event_queue.emit(InputEvent(message="Go", sender="external"))
await processor.process_next()
await processor.process_next()
await processor.process_next()
await processor.process_next()
await processor.process_next()
await processor.process_next()
await processor.process_next()
await processor.process_next()
await processor.process_next()
await processor.process_next()
await processor.process_next()