From a37c8ddef10236517c83419a592f68ec8061049f Mon Sep 17 00:00:00 2001 From: XuhuiZhou Date: Thu, 28 Nov 2024 17:56:34 -0500 Subject: [PATCH 01/10] initial framework --- .../sotopia_original_replica/llm_agent.py | 87 ++++++++++++++++ .../sotopia_original_replica/origin.svg | 1 + .../sotopia_original_replica/origin.toml | 49 ++++++++++ .../sotopia_original_replica/readme.md | 13 +++ sotopia/experimental/agents/datamodels.py | 41 ++++++++ sotopia/experimental/agents/moderator.py | 98 +++++++++++++++++++ 6 files changed, 289 insertions(+) create mode 100644 examples/experimental/sotopia_original_replica/llm_agent.py create mode 100644 examples/experimental/sotopia_original_replica/origin.svg create mode 100644 examples/experimental/sotopia_original_replica/origin.toml create mode 100644 examples/experimental/sotopia_original_replica/readme.md create mode 100644 sotopia/experimental/agents/datamodels.py create mode 100644 sotopia/experimental/agents/moderator.py diff --git a/examples/experimental/sotopia_original_replica/llm_agent.py b/examples/experimental/sotopia_original_replica/llm_agent.py new file mode 100644 index 00000000..05149926 --- /dev/null +++ b/examples/experimental/sotopia_original_replica/llm_agent.py @@ -0,0 +1,87 @@ +import logging +import sys +from rich.logging import RichHandler + + +from aact import NodeFactory + +from sotopia.experimental.agents.base_agent import BaseAgent +from sotopia.experimental.agents.datamodels import Observation, AgentAction + +from sotopia.generation_utils import agenerate_action + + +# Check Python version +if sys.version_info >= (3, 11): + pass +else: + pass + +# Configure logging +FORMAT = "%(asctime)s - %(levelname)s - %(name)s - %(message)s" +logging.basicConfig( + level=logging.WARNING, + format=FORMAT, + datefmt="[%X]", + handlers=[RichHandler()], +) + + +@NodeFactory.register("llm_agent") +class LLMAgent(BaseAgent[Observation, AgentAction]): + def __init__( + self, + input_channels: list[str], + output_channel: str, + query_interval: int, + agent_name: str, + goal: str, + model_name: str, + redis_url: str, + ): + super().__init__( + [(input_channel, Observation) for input_channel in input_channels], + [(output_channel, AgentAction)], + redis_url, + ) + self.output_channel = output_channel + self.query_interval = query_interval + self.count_ticks = 0 + self.message_history: list[tuple[str, str, str]] = [] + self.name = agent_name + self.model_name = model_name + self.goal = goal + + def _format_message_history( + self, message_history: list[tuple[str, str, str]] + ) -> str: + ## TODO: akhatua Fix the mapping of action to be gramatically correct + return "\n".join( + (f"{speaker} {action} {message}") + for speaker, action, message in message_history + ) + + async def aact(self, obs: Observation) -> AgentAction: + self.message_history.append( + (obs.agent_name, self.name, obs.to_natural_language()) + ) + + if len(obs.available_actions) == 1 and "none" in obs.available_actions: + return AgentAction( + output_channel=self.output_channel, action_type="none", argument="" + ) + else: + action = await agenerate_action( + self.model_name, + history=self._format_message_history(self.message_history), + turn_number=obs.turn_number, + action_types=obs.available_actions, + agent=self.name, + goal=self.goal, + ) + + return AgentAction( + output_channel=self.output_channel, + action_type=action.action_type, + argument=action.argument, + ) diff --git a/examples/experimental/sotopia_original_replica/origin.svg b/examples/experimental/sotopia_original_replica/origin.svg new file mode 100644 index 00000000..78717b14 --- /dev/null +++ b/examples/experimental/sotopia_original_replica/origin.svg @@ -0,0 +1 @@ +

examples/experimental/sotopia_original_replica/origin.toml

Jane:moderator

Jack:moderator

moderator:Jane

moderator:Jack

Jane:Jack

Jack:Jane

Agent:Runtime

'Jane'

'moderator'

'Jack'

'chat_print'

diff --git a/examples/experimental/sotopia_original_replica/origin.toml b/examples/experimental/sotopia_original_replica/origin.toml new file mode 100644 index 00000000..55814ca3 --- /dev/null +++ b/examples/experimental/sotopia_original_replica/origin.toml @@ -0,0 +1,49 @@ +redis_url = "redis://localhost:6379/0" +extra_modules = ["examples.experimental.sotopia_original_replica.llm_agent", "examples.experimental.nodes.chat_print_node", "sotopia.experimental.agents.moderator"] + + +[[nodes]] +node_name = "moderator" +node_class = "moderator" + +[nodes.node_args] +output_channels = ["moderator:Jane", "moderator:Jack"] +input_channels = ["Jane:moderator", "Jack:moderator"] +agent_mapping = {"moderator:Jane" = "Jane", "moderator:Jack" = "Jack"} + +[[nodes]] +node_name = "Jack" +node_class = "llm_agent" + +[nodes.node_args] +query_interval = 5 +input_channels = ["moderator:Jack"] +output_channel = "Jack:moderator" +goal = "Your goal is to effectively test Jane's technical ability and finally decide if she has passed the interview. Make sure to also evaluate her communication skills, problem-solving approach, and enthusiasm." +model_name = "gpt-4o-mini" +agent_name = "Jack" + + +[[nodes]] +node_name = "Jane" +node_class = "llm_agent" + +[nodes.node_args] +query_interval = 7 +output_channel = "Jane:moderator" +input_channels = ["moderator:Jane"] +goal = "Your goal is to do well in the interview by demonstrating your technical skills, clear communication, and enthusiasm for the position. Stay calm, ask clarifying questions when needed, and confidently explain your thought process." +model_name = "gpt-4o-mini" +agent_name = "Jane" + +[[nodes]] +node_name = "chat_print" +node_class = "chat_print" + +[nodes.node_args.print_channel_types] +"Jane:Jack" = "agent_action" +"Jack:Jane" = "agent_action" +"Agent:Runtime" = "agent_action" + +[nodes.node_args] +env_agents = ["Jack", "Jane"] diff --git a/examples/experimental/sotopia_original_replica/readme.md b/examples/experimental/sotopia_original_replica/readme.md new file mode 100644 index 00000000..cb3931dc --- /dev/null +++ b/examples/experimental/sotopia_original_replica/readme.md @@ -0,0 +1,13 @@ +To run this example, please use aact to launch. + +```bash +aact run-dataflow examples/experimental/sotopia_original_replica/origin.toml +``` + +To view the flow of the information, please run: + +```bash +aact draw-dataflow examples/experimental/sotopia_original_replica/origin.toml --svg-path examples/experimental/sotopia_original_replica/origin.svg +``` + +![Alt text](./origin.svg) diff --git a/sotopia/experimental/agents/datamodels.py b/sotopia/experimental/agents/datamodels.py new file mode 100644 index 00000000..8a097ea1 --- /dev/null +++ b/sotopia/experimental/agents/datamodels.py @@ -0,0 +1,41 @@ +from sotopia.messages import ActionType +from aact.messages import DataModel, DataModelFactory +from pydantic import Field + + +@DataModelFactory.register("observation") +class Observation(DataModel): + agent_name: str = Field(description="the name of the agent") + last_turn: str = Field(description="the last turn of the conversation") + turn_number: int = Field(description="the turn number of the conversation") + available_actions: list[ActionType] = Field(description="the available actions") + + def to_natural_language(self) -> str: + if self.turn_number == 0: + return f"\n{self.last_turn}\nConversation Starts:\n" + else: + return f"Turn #{self.turn_number-1}: {self.last_turn}\n" + + +@DataModelFactory.register("agent_action") +class AgentAction(DataModel): + output_channel: str = Field(description="the name of the output channel") + action_type: ActionType = Field( + description="whether to speak at this turn or choose to not do anything" + ) + argument: str = Field( + description="the utterance if choose to speak, the expression or gesture if choose non-verbal communication, or the physical action if choose action" + ) + + def to_natural_language(self) -> str: + match self.action_type: + case "none": + return "did nothing" + case "speak": + return f'said: "{self.argument}"' + case "non-verbal communication": + return f"[{self.action_type}] {self.argument}" + case "action": + return f"[{self.action_type}] {self.argument}" + case "leave": + return "left the conversation" diff --git a/sotopia/experimental/agents/moderator.py b/sotopia/experimental/agents/moderator.py new file mode 100644 index 00000000..aecc7d8a --- /dev/null +++ b/sotopia/experimental/agents/moderator.py @@ -0,0 +1,98 @@ +import asyncio +import sys + + +if sys.version_info < (3, 11): + pass +else: + pass + + +from aact import Message, NodeFactory +from aact.messages import DataModel, DataModelFactory + +from typing import Literal +from pydantic import Field + + +from .base_agent import BaseAgent +from .datamodels import AgentAction, Observation +from sotopia.messages import ActionType + + +@DataModelFactory.register("observations") +class Observations(DataModel): + observations_map: dict[str, Observation] = Field( + description="the observations of the agents" + ) + + +@NodeFactory.register("moderator") +class Moderator(BaseAgent[AgentAction, Observations]): + def __init__( + self, + input_channels: list[str], + output_channels: list[str], + agent_mapping: dict[str, str], + redis_url: str = "redis://localhost:6379/0", + action_order: Literal["simultaneous", "round-robin", "random"] = "round-robin", + available_actions: list[ActionType] = [ + "none", + "speak", + "non-verbal communication", + "action", + "leave", + ], + ): + super().__init__( + input_channel_types=[ + (input_channel, AgentAction) for input_channel in input_channels + ], + output_channel_types=[ + (output_channel, Observations) for output_channel in output_channels + ], + redis_url=redis_url, + ) + self.observation_queue: asyncio.Queue[AgentAction] = asyncio.Queue() + self.task_scheduler: asyncio.Task[None] | None = None + self.shutdown_event: asyncio.Event = asyncio.Event() + self.agent_mapping: dict[str, str] = agent_mapping + self.action_order: Literal["simultaneous", "round-robin", "random"] = ( + action_order + ) + self.available_actions: list[ActionType] = available_actions + self.turn_number: int = 0 + self.current_agent_index: int = 0 + self.agents: list[str] = list(agent_mapping.values()) + + async def send(self, action: Observations) -> None: + for output_channel, output_channel_type in self.output_channel_types.items(): + await self.r.publish( + output_channel, + Message[output_channel_type]( + data=action.observations_map[output_channel] + ).model_dump_json(), # type:ignore[valid-type] + ) + + async def aact(self, agent_action: AgentAction) -> Observations | None: + self.turn_number += 1 + observations_map: dict[str, Observation] = {} + for output_channel, output_channel_type in self.output_channel_types.items(): + agent_name = self.agent_mapping[output_channel] + available_actions: list[ActionType] = ["none"] + if self.action_order == "round-robin": + if agent_name == self.agents[self.current_agent_index]: + available_actions = self.available_actions + self.current_agent_index = (self.current_agent_index + 1) % len( + self.agents + ) + + observation = Observation( + agent_name=agent_name, + last_turn=agent_action.to_natural_language(), + turn_number=self.turn_number, + available_actions=available_actions, + ) + observations_map[output_channel] = observation + + return Observations(observations_map=observations_map) From 9c78302ac33507230a5370c60477f3c0bb2edfda Mon Sep 17 00:00:00 2001 From: XuhuiZhou Date: Fri, 29 Nov 2024 22:31:02 -0500 Subject: [PATCH 02/10] initial conv --- .../sotopia_original_replica/llm_agent.py | 54 +++++++++++-------- .../sotopia_original_replica/origin.toml | 11 ++-- sotopia/experimental/agents/datamodels.py | 1 + sotopia/experimental/agents/moderator.py | 41 +++++++++++--- 4 files changed, 73 insertions(+), 34 deletions(-) diff --git a/examples/experimental/sotopia_original_replica/llm_agent.py b/examples/experimental/sotopia_original_replica/llm_agent.py index 05149926..e20f6ad3 100644 --- a/examples/experimental/sotopia_original_replica/llm_agent.py +++ b/examples/experimental/sotopia_original_replica/llm_agent.py @@ -8,7 +8,8 @@ from sotopia.experimental.agents.base_agent import BaseAgent from sotopia.experimental.agents.datamodels import Observation, AgentAction -from sotopia.generation_utils import agenerate_action +from sotopia.generation_utils import agenerate +from sotopia.generation_utils.generate import StrOutputParser # Check Python version @@ -47,41 +48,50 @@ def __init__( self.output_channel = output_channel self.query_interval = query_interval self.count_ticks = 0 - self.message_history: list[tuple[str, str, str]] = [] + self.message_history: list[Observation] = [] self.name = agent_name self.model_name = model_name self.goal = goal - def _format_message_history( - self, message_history: list[tuple[str, str, str]] - ) -> str: + def _format_message_history(self, message_history: list[Observation]) -> str: ## TODO: akhatua Fix the mapping of action to be gramatically correct - return "\n".join( - (f"{speaker} {action} {message}") - for speaker, action, message in message_history - ) + return "\n".join(message.to_natural_language() for message in message_history) async def aact(self, obs: Observation) -> AgentAction: - self.message_history.append( - (obs.agent_name, self.name, obs.to_natural_language()) - ) + self.message_history.append(obs) if len(obs.available_actions) == 1 and "none" in obs.available_actions: return AgentAction( - output_channel=self.output_channel, action_type="none", argument="" + agent_name=self.name, + output_channel=self.output_channel, + action_type="none", + argument="", ) else: - action = await agenerate_action( - self.model_name, - history=self._format_message_history(self.message_history), - turn_number=obs.turn_number, - action_types=obs.available_actions, - agent=self.name, - goal=self.goal, + history = self._format_message_history(self.message_history) + action: str = await agenerate( + model_name=self.model_name, + template="Imagine that you are a friend of the other persons. Here is the " + "conversation between you and them.\n" + "You are {agent_name} in the conversation.\n" + "{message_history}\n" + "and you plan to {goal}.\n" + "You can choose to interrupt the other person " + "by saying something or not to interrupt by outputting notiong. What would you say? " + "Please only output a sentence or not outputting anything." + "{format_instructions}", + input_values={ + "message_history": history, + "goal": self.goal, + "agent_name": self.name, + }, + temperature=0.7, + output_parser=StrOutputParser(), ) return AgentAction( + agent_name=self.name, output_channel=self.output_channel, - action_type=action.action_type, - argument=action.argument, + action_type="speak", + argument=action, ) diff --git a/examples/experimental/sotopia_original_replica/origin.toml b/examples/experimental/sotopia_original_replica/origin.toml index 55814ca3..6af998f7 100644 --- a/examples/experimental/sotopia_original_replica/origin.toml +++ b/examples/experimental/sotopia_original_replica/origin.toml @@ -10,6 +10,8 @@ node_class = "moderator" output_channels = ["moderator:Jane", "moderator:Jack"] input_channels = ["Jane:moderator", "Jack:moderator"] agent_mapping = {"moderator:Jane" = "Jane", "moderator:Jack" = "Jack"} +scenario = "Two friends are sitting in a cafe and catching up with each other's lives." +max_turns = 20 [[nodes]] node_name = "Jack" @@ -19,7 +21,7 @@ node_class = "llm_agent" query_interval = 5 input_channels = ["moderator:Jack"] output_channel = "Jack:moderator" -goal = "Your goal is to effectively test Jane's technical ability and finally decide if she has passed the interview. Make sure to also evaluate her communication skills, problem-solving approach, and enthusiasm." +goal = "Your goal is to borrow 5000 dollars from Jane." model_name = "gpt-4o-mini" agent_name = "Jack" @@ -32,7 +34,7 @@ node_class = "llm_agent" query_interval = 7 output_channel = "Jane:moderator" input_channels = ["moderator:Jane"] -goal = "Your goal is to do well in the interview by demonstrating your technical skills, clear communication, and enthusiasm for the position. Stay calm, ask clarifying questions when needed, and confidently explain your thought process." +goal = "Your goal is to help Jack however, you are in a finicial crisis yourself and can only afford to give him 500 dollars." model_name = "gpt-4o-mini" agent_name = "Jane" @@ -41,9 +43,8 @@ node_name = "chat_print" node_class = "chat_print" [nodes.node_args.print_channel_types] -"Jane:Jack" = "agent_action" -"Jack:Jane" = "agent_action" -"Agent:Runtime" = "agent_action" +"Jane:moderator" = "agent_action" +"Jack:moderator" = "agent_action" [nodes.node_args] env_agents = ["Jack", "Jane"] diff --git a/sotopia/experimental/agents/datamodels.py b/sotopia/experimental/agents/datamodels.py index 8a097ea1..a243a52a 100644 --- a/sotopia/experimental/agents/datamodels.py +++ b/sotopia/experimental/agents/datamodels.py @@ -19,6 +19,7 @@ def to_natural_language(self) -> str: @DataModelFactory.register("agent_action") class AgentAction(DataModel): + agent_name: str = Field(description="the name of the agent") output_channel: str = Field(description="the name of the output channel") action_type: ActionType = Field( description="whether to speak at this turn or choose to not do anything" diff --git a/sotopia/experimental/agents/moderator.py b/sotopia/experimental/agents/moderator.py index aecc7d8a..cee75b8e 100644 --- a/sotopia/experimental/agents/moderator.py +++ b/sotopia/experimental/agents/moderator.py @@ -11,7 +11,7 @@ from aact import Message, NodeFactory from aact.messages import DataModel, DataModelFactory -from typing import Literal +from typing import Literal, Self from pydantic import Field @@ -28,11 +28,12 @@ class Observations(DataModel): @NodeFactory.register("moderator") -class Moderator(BaseAgent[AgentAction, Observations]): +class Moderator(BaseAgent[AgentAction, Observation]): def __init__( self, input_channels: list[str], output_channels: list[str], + scenario: str, agent_mapping: dict[str, str], redis_url: str = "redis://localhost:6379/0", action_order: Literal["simultaneous", "round-robin", "random"] = "round-robin", @@ -43,13 +44,14 @@ def __init__( "action", "leave", ], + max_turns: int = 20, ): super().__init__( input_channel_types=[ (input_channel, AgentAction) for input_channel in input_channels ], output_channel_types=[ - (output_channel, Observations) for output_channel in output_channels + (output_channel, Observation) for output_channel in output_channels ], redis_url=redis_url, ) @@ -62,7 +64,9 @@ def __init__( ) self.available_actions: list[ActionType] = available_actions self.turn_number: int = 0 + self.max_turns: int = max_turns self.current_agent_index: int = 0 + self.scenario: str = scenario self.agents: list[str] = list(agent_mapping.values()) async def send(self, action: Observations) -> None: @@ -74,8 +78,33 @@ async def send(self, action: Observations) -> None: ).model_dump_json(), # type:ignore[valid-type] ) + async def __aenter__(self) -> Self: + print(self.scenario) + await self.send( + Observations( + observations_map={ + output_channel: Observation( + agent_name="moderator", + last_turn=self.scenario, + turn_number=0, + available_actions=self.available_actions + if agent_name == self.agents[0] + else ["none"], + ) + for output_channel, agent_name in self.agent_mapping.items() + } + ) + ) + self.current_agent_index += 1 + self.task_scheduler = asyncio.create_task(self._task_scheduler()) + return await super().__aenter__() + async def aact(self, agent_action: AgentAction) -> Observations | None: - self.turn_number += 1 + if self.turn_number < 20: + self.turn_number += 1 + else: + self.shutdown_event.set() + return None observations_map: dict[str, Observation] = {} for output_channel, output_channel_type in self.output_channel_types.items(): agent_name = self.agent_mapping[output_channel] @@ -83,9 +112,6 @@ async def aact(self, agent_action: AgentAction) -> Observations | None: if self.action_order == "round-robin": if agent_name == self.agents[self.current_agent_index]: available_actions = self.available_actions - self.current_agent_index = (self.current_agent_index + 1) % len( - self.agents - ) observation = Observation( agent_name=agent_name, @@ -94,5 +120,6 @@ async def aact(self, agent_action: AgentAction) -> Observations | None: available_actions=available_actions, ) observations_map[output_channel] = observation + self.current_agent_index = (self.current_agent_index + 1) % len(self.agents) return Observations(observations_map=observations_map) From 4e13a106cbbb2a476ad39aeaa65b77a4cefb7ac3 Mon Sep 17 00:00:00 2001 From: XuhuiZhou Date: Fri, 29 Nov 2024 22:46:03 -0500 Subject: [PATCH 03/10] fix module error --- .../{llm_agent.py => llm_agent_sotopia.py} | 0 examples/experimental/sotopia_original_replica/origin.toml | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename examples/experimental/sotopia_original_replica/{llm_agent.py => llm_agent_sotopia.py} (100%) diff --git a/examples/experimental/sotopia_original_replica/llm_agent.py b/examples/experimental/sotopia_original_replica/llm_agent_sotopia.py similarity index 100% rename from examples/experimental/sotopia_original_replica/llm_agent.py rename to examples/experimental/sotopia_original_replica/llm_agent_sotopia.py diff --git a/examples/experimental/sotopia_original_replica/origin.toml b/examples/experimental/sotopia_original_replica/origin.toml index 6af998f7..fac38ee1 100644 --- a/examples/experimental/sotopia_original_replica/origin.toml +++ b/examples/experimental/sotopia_original_replica/origin.toml @@ -1,5 +1,5 @@ redis_url = "redis://localhost:6379/0" -extra_modules = ["examples.experimental.sotopia_original_replica.llm_agent", "examples.experimental.nodes.chat_print_node", "sotopia.experimental.agents.moderator"] +extra_modules = ["examples.experimental.sotopia_original_replica.llm_agent_sotopia", "examples.experimental.nodes.chat_print_node", "sotopia.experimental.agents.moderator"] [[nodes]] From 81fd841d1ec2cdf1bc1c084a464d32455327fa22 Mon Sep 17 00:00:00 2001 From: JXZhou <156194797+JXZhou0224@users.noreply.github.com> Date: Fri, 13 Dec 2024 10:01:18 +0800 Subject: [PATCH 04/10] feat: Add 3 new features to Moderator (#266) * feat:introduce booting procedure, saving, and ending chat to moderator * fix: moderator will now ignore none AgentAction, Observations now don't have to include all channels in the mapping * merge changes of example into the original one * fix: 1. save() method now accepts push_to_db config 2. booting()'s waiting time is changed to 0.1 sec * fix: rewrite booting() so that different agent will receive different background information * fix: moderator now inherits from Node directly, instead of from BaseAgent --------- Co-authored-by: JXZhou --- .../llm_agent_sotopia.py | 18 +- .../sotopia_original_replica/origin.toml | 2 + sotopia/experimental/agents/moderator.py | 186 +++++++++++++++--- 3 files changed, 176 insertions(+), 30 deletions(-) diff --git a/examples/experimental/sotopia_original_replica/llm_agent_sotopia.py b/examples/experimental/sotopia_original_replica/llm_agent_sotopia.py index e20f6ad3..b421213f 100644 --- a/examples/experimental/sotopia_original_replica/llm_agent_sotopia.py +++ b/examples/experimental/sotopia_original_replica/llm_agent_sotopia.py @@ -2,7 +2,6 @@ import sys from rich.logging import RichHandler - from aact import NodeFactory from sotopia.experimental.agents.base_agent import BaseAgent @@ -11,7 +10,6 @@ from sotopia.generation_utils import agenerate from sotopia.generation_utils.generate import StrOutputParser - # Check Python version if sys.version_info >= (3, 11): pass @@ -58,6 +56,14 @@ def _format_message_history(self, message_history: list[Observation]) -> str: return "\n".join(message.to_natural_language() for message in message_history) async def aact(self, obs: Observation) -> AgentAction: + if obs.turn_number == -1: + return AgentAction( + agent_name=self.name, + output_channel=self.output_channel, + action_type="none", + argument=self.model_name, + ) + self.message_history.append(obs) if len(obs.available_actions) == 1 and "none" in obs.available_actions: @@ -67,6 +73,14 @@ async def aact(self, obs: Observation) -> AgentAction: action_type="none", argument="", ) + elif len(obs.available_actions) == 1 and "leave" in obs.available_actions: + self.shutdown_event.set() + return AgentAction( + agent_name=self.name, + output_channel=self.output_channel, + action_type="leave", + argument="", + ) else: history = self._format_message_history(self.message_history) action: str = await agenerate( diff --git a/examples/experimental/sotopia_original_replica/origin.toml b/examples/experimental/sotopia_original_replica/origin.toml index fac38ee1..cd317eed 100644 --- a/examples/experimental/sotopia_original_replica/origin.toml +++ b/examples/experimental/sotopia_original_replica/origin.toml @@ -9,9 +9,11 @@ node_class = "moderator" [nodes.node_args] output_channels = ["moderator:Jane", "moderator:Jack"] input_channels = ["Jane:moderator", "Jack:moderator"] +agent_backgrounds = {"Jane" = "", "Jack" = ""} agent_mapping = {"moderator:Jane" = "Jane", "moderator:Jack" = "Jack"} scenario = "Two friends are sitting in a cafe and catching up with each other's lives." max_turns = 20 +push_to_db = true [[nodes]] node_name = "Jack" diff --git a/sotopia/experimental/agents/moderator.py b/sotopia/experimental/agents/moderator.py index cee75b8e..65e97014 100644 --- a/sotopia/experimental/agents/moderator.py +++ b/sotopia/experimental/agents/moderator.py @@ -8,14 +8,13 @@ pass -from aact import Message, NodeFactory +from aact import Message, NodeFactory, Node from aact.messages import DataModel, DataModelFactory -from typing import Literal, Self +from typing import Literal, Self, Any, AsyncIterator from pydantic import Field - -from .base_agent import BaseAgent +from sotopia.database import EpisodeLog from .datamodels import AgentAction, Observation from sotopia.messages import ActionType @@ -28,13 +27,14 @@ class Observations(DataModel): @NodeFactory.register("moderator") -class Moderator(BaseAgent[AgentAction, Observation]): +class Moderator(Node[AgentAction, Observation]): def __init__( self, input_channels: list[str], output_channels: list[str], scenario: str, agent_mapping: dict[str, str], + agent_backgrounds: dict[str, str], redis_url: str = "redis://localhost:6379/0", action_order: Literal["simultaneous", "round-robin", "random"] = "round-robin", available_actions: list[ActionType] = [ @@ -45,6 +45,7 @@ def __init__( "leave", ], max_turns: int = 20, + push_to_db: bool = False, ): super().__init__( input_channel_types=[ @@ -68,43 +69,172 @@ def __init__( self.current_agent_index: int = 0 self.scenario: str = scenario self.agents: list[str] = list(agent_mapping.values()) + self.agent_models: dict[str, str] = {} + self.agents_awake: dict[str, bool] = {name: False for name in self.agents} + self.all_agents_awake: asyncio.Event = asyncio.Event() + self.message_history: list[list[tuple[str, str, str]]] = [ + [("Environment", "Environment", self.scenario)] + ] + self.push_to_db = push_to_db + self.agent_backgrounds = agent_backgrounds + + if self.action_order == "round-robin": + pass + else: + raise NotImplementedError( + "the selected action order is currently not implemented" + ) + + async def __aenter__(self) -> Self: + print(self.scenario) + asyncio.create_task(self.booting()) + self.task_scheduler = asyncio.create_task(self._task_scheduler()) + return await super().__aenter__() + + async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None: + self.shutdown_event.set() + if self.task_scheduler is not None: + self.task_scheduler.cancel() + return await super().__aexit__(exc_type, exc_value, traceback) async def send(self, action: Observations) -> None: for output_channel, output_channel_type in self.output_channel_types.items(): - await self.r.publish( - output_channel, - Message[output_channel_type]( - data=action.observations_map[output_channel] - ).model_dump_json(), # type:ignore[valid-type] + if output_channel in action.observations_map: + await self.r.publish( + output_channel, + Message[output_channel_type]( # type:ignore[valid-type] + data=action.observations_map[output_channel] + ).model_dump_json(), + ) + + async def event_handler( + self, channel: str, message: Message[AgentAction] + ) -> AsyncIterator[tuple[str, Message[Observation]]]: + if channel in self.input_channel_types: + await self.observation_queue.put(message.data) + else: + raise ValueError(f"Invalid channel: {channel}") + yield "", self.output_type() + + async def _task_scheduler(self) -> None: + await self.all_agents_awake.wait() + while not self.shutdown_event.is_set(): + observation = await self.observation_queue.get() + action_or_none = await self.aact(observation) + if action_or_none is not None: + await self.send(action_or_none) + self.observation_queue.task_done() + + async def booting(self) -> None: + """ + 1. send checking message to agents for every 0.1 seconds, until all agents are awake + - this message has turn_number of -1 for identification, agents should not record this into actual message_history + - if the agent booted succesfully, he is expected to return its model name for record. + 2. (under round-robin action order)after all agents are awake, send agent[0] a message to allow the agent to start speaking + """ + while not self.all_agents_awake.is_set(): + await self.send( + Observations( + observations_map={ + output_channel: Observation( + agent_name="moderator", + last_turn=self.scenario, + turn_number=-1, + available_actions=["none"], + ) + for output_channel, agent_name in self.agent_mapping.items() + } + ) + ) + await asyncio.sleep(0.1) + while not self.observation_queue.empty(): + agent_action = await self.observation_queue.get() + self.agents_awake[agent_action.agent_name] = True + self.agent_models[agent_action.agent_name] = agent_action.argument + if False not in self.agents_awake.values(): + self.all_agents_awake.set() + + if self.action_order == "round-robin": + await self.send( + Observations( + observations_map={ + output_channel: Observation( + agent_name="moderator", + last_turn=self.agent_backgrounds[agent_name], + turn_number=0, + available_actions=self.available_actions + if agent_name == self.agents[0] + else ["none"], + ) + for output_channel, agent_name in self.agent_mapping.items() + } + ) ) + self.current_agent_index += 1 + + async def save(self) -> EpisodeLog: + """ + save the EpisodeLog to redis, without evaluating + TODO: specify what to be added inside tag + TODO: update the code so that EpisodeLog.render_for_humans() can work + -currently it cannot work because no AgentProfile has been uploaded to redis + -such a process should be done back in the agents' end + -also the current agentslist is consist of names, but not uuid's of agents + """ + epilog = EpisodeLog( + environment=self.scenario, + agents=self.agents, + tag=None, + models=list(self.agent_models.values()), + messages=self.message_history, + reasoning="", + rewards=[0] * len(self.agents), + rewards_prompt="", + ) + epilog.save() + # print(epilog.render_for_humans()) + return epilog - async def __aenter__(self) -> Self: - print(self.scenario) - await self.send( - Observations( + async def aact(self, agent_action: AgentAction) -> Observations | None: + if agent_action.action_type == "none": + return None + + if len(self.message_history) == 1: + self.message_history[0].append( + ( + agent_action.agent_name, + "Environment", + agent_action.to_natural_language(), + ) + ) + else: + self.message_history.append( + [ + ( + agent_action.agent_name, + "Environment", + agent_action.to_natural_language(), + ) + ] + ) + + if self.turn_number < self.max_turns: + self.turn_number += 1 + else: + await self.save() + self.shutdown_event.set() + return Observations( observations_map={ output_channel: Observation( agent_name="moderator", last_turn=self.scenario, - turn_number=0, - available_actions=self.available_actions - if agent_name == self.agents[0] - else ["none"], + turn_number=self.turn_number + 1, + available_actions=["leave"], ) for output_channel, agent_name in self.agent_mapping.items() } ) - ) - self.current_agent_index += 1 - self.task_scheduler = asyncio.create_task(self._task_scheduler()) - return await super().__aenter__() - async def aact(self, agent_action: AgentAction) -> Observations | None: - if self.turn_number < 20: - self.turn_number += 1 - else: - self.shutdown_event.set() - return None observations_map: dict[str, Observation] = {} for output_channel, output_channel_type in self.output_channel_types.items(): agent_name = self.agent_mapping[output_channel] From bb761af5b5b326e4b323901c689230b9d07c1999 Mon Sep 17 00:00:00 2001 From: XuhuiZhou Date: Fri, 13 Dec 2024 15:18:30 -0500 Subject: [PATCH 05/10] add save condition for moderator --- sotopia/experimental/agents/moderator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sotopia/experimental/agents/moderator.py b/sotopia/experimental/agents/moderator.py index 65e97014..73bddb00 100644 --- a/sotopia/experimental/agents/moderator.py +++ b/sotopia/experimental/agents/moderator.py @@ -221,7 +221,8 @@ async def aact(self, agent_action: AgentAction) -> Observations | None: if self.turn_number < self.max_turns: self.turn_number += 1 else: - await self.save() + if self.push_to_db: + await self.save() self.shutdown_event.set() return Observations( observations_map={ From 7ec769b1e77bb188b8f00717da94634c0fa9a747 Mon Sep 17 00:00:00 2001 From: XuhuiZhou Date: Fri, 13 Dec 2024 15:18:49 -0500 Subject: [PATCH 06/10] push to db false --- examples/experimental/sotopia_original_replica/origin.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/experimental/sotopia_original_replica/origin.toml b/examples/experimental/sotopia_original_replica/origin.toml index cd317eed..297b229c 100644 --- a/examples/experimental/sotopia_original_replica/origin.toml +++ b/examples/experimental/sotopia_original_replica/origin.toml @@ -13,7 +13,7 @@ agent_backgrounds = {"Jane" = "", "Jack" = ""} agent_mapping = {"moderator:Jane" = "Jane", "moderator:Jack" = "Jack"} scenario = "Two friends are sitting in a cafe and catching up with each other's lives." max_turns = 20 -push_to_db = true +push_to_db = false [[nodes]] node_name = "Jack" From c2206642a9aab0d9772a8a8a4b68b49e839c0c82 Mon Sep 17 00:00:00 2001 From: XuhuiZhou Date: Fri, 13 Dec 2024 20:48:31 -0500 Subject: [PATCH 07/10] to fully stop --- .../nodes/initial_message_node.py | 2 ++ .../llm_agent_sotopia.py | 2 ++ .../sotopia_original_replica/origin.toml | 2 +- pyproject.toml | 3 ++ sotopia/experimental/agents/base_agent.py | 2 ++ sotopia/experimental/agents/moderator.py | 29 +++++++++++++++---- uv.lock | 9 ++---- 7 files changed, 36 insertions(+), 13 deletions(-) diff --git a/examples/experimental/nodes/initial_message_node.py b/examples/experimental/nodes/initial_message_node.py index 9cb7f63c..9ff4c3bd 100644 --- a/examples/experimental/nodes/initial_message_node.py +++ b/examples/experimental/nodes/initial_message_node.py @@ -18,6 +18,7 @@ def __init__( input_tick_channel: str, output_channels: list[str], env_scenario: str, + node_name: str, redis_url: str = "redis://localhost:6379/0", ): super().__init__( @@ -26,6 +27,7 @@ def __init__( (output_channel, Text) for output_channel in output_channels ], redis_url=redis_url, + node_name=node_name, ) self.env_scenario = env_scenario self.output_channels = output_channels diff --git a/examples/experimental/sotopia_original_replica/llm_agent_sotopia.py b/examples/experimental/sotopia_original_replica/llm_agent_sotopia.py index b421213f..abe95929 100644 --- a/examples/experimental/sotopia_original_replica/llm_agent_sotopia.py +++ b/examples/experimental/sotopia_original_replica/llm_agent_sotopia.py @@ -34,6 +34,7 @@ def __init__( output_channel: str, query_interval: int, agent_name: str, + node_name: str, goal: str, model_name: str, redis_url: str, @@ -42,6 +43,7 @@ def __init__( [(input_channel, Observation) for input_channel in input_channels], [(output_channel, AgentAction)], redis_url, + node_name, ) self.output_channel = output_channel self.query_interval = query_interval diff --git a/examples/experimental/sotopia_original_replica/origin.toml b/examples/experimental/sotopia_original_replica/origin.toml index 297b229c..7bf22527 100644 --- a/examples/experimental/sotopia_original_replica/origin.toml +++ b/examples/experimental/sotopia_original_replica/origin.toml @@ -12,7 +12,7 @@ input_channels = ["Jane:moderator", "Jack:moderator"] agent_backgrounds = {"Jane" = "", "Jack" = ""} agent_mapping = {"moderator:Jane" = "Jane", "moderator:Jack" = "Jack"} scenario = "Two friends are sitting in a cafe and catching up with each other's lives." -max_turns = 20 +max_turns = 2 push_to_db = false [[nodes]] diff --git a/pyproject.toml b/pyproject.toml index 57af6cc3..b9edcc94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,9 @@ plugins = [ module = "transformers.*" ignore_missing_imports = true +[tool.uv.sources] +aact = { git = "https://github.com/ProKil/aact" , branch = "feature/node-manager" } + [tool.pytest.ini_options] testpaths = ["tests"] python_files = "test_*.py" diff --git a/sotopia/experimental/agents/base_agent.py b/sotopia/experimental/agents/base_agent.py index a7bbafae..6d9466bb 100644 --- a/sotopia/experimental/agents/base_agent.py +++ b/sotopia/experimental/agents/base_agent.py @@ -22,11 +22,13 @@ def __init__( input_channel_types: list[tuple[str, type[T_agent_observation]]], output_channel_types: list[tuple[str, type[T_agent_action]]], redis_url: str = "redis://localhost:6379/0", + node_name: str = "base_agent", ): super().__init__( input_channel_types=input_channel_types, output_channel_types=output_channel_types, redis_url=redis_url, + node_name=node_name, ) self.observation_queue: asyncio.Queue[T_agent_observation] = asyncio.Queue() diff --git a/sotopia/experimental/agents/moderator.py b/sotopia/experimental/agents/moderator.py index 73bddb00..6300a3af 100644 --- a/sotopia/experimental/agents/moderator.py +++ b/sotopia/experimental/agents/moderator.py @@ -34,6 +34,7 @@ def __init__( output_channels: list[str], scenario: str, agent_mapping: dict[str, str], + node_name: str, agent_backgrounds: dict[str, str], redis_url: str = "redis://localhost:6379/0", action_order: Literal["simultaneous", "round-robin", "random"] = "round-robin", @@ -55,6 +56,7 @@ def __init__( (output_channel, Observation) for output_channel in output_channels ], redis_url=redis_url, + node_name=node_name, ) self.observation_queue: asyncio.Queue[AgentAction] = asyncio.Queue() self.task_scheduler: asyncio.Task[None] | None = None @@ -97,13 +99,13 @@ async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None self.task_scheduler.cancel() return await super().__aexit__(exc_type, exc_value, traceback) - async def send(self, action: Observations) -> None: + async def send(self, observations: Observations) -> None: for output_channel, output_channel_type in self.output_channel_types.items(): - if output_channel in action.observations_map: + if output_channel in observations.observations_map: await self.r.publish( output_channel, Message[output_channel_type]( # type:ignore[valid-type] - data=action.observations_map[output_channel] + data=observations.observations_map[output_channel] ).model_dump_json(), ) @@ -172,6 +174,19 @@ async def booting(self) -> None: ) self.current_agent_index += 1 + async def wrap_up_and_stop(self) -> None: + if self.push_to_db: + await self.save() + await asyncio.sleep(0.5) + print("stopping all agents") + for output_channel, output_channel_type in self.output_channel_types.items(): + await self.r.publish( + output_channel, + Message[output_channel_type]( # type:ignore[valid-type] + data=f"shutdown:{self.node_name}" + ).model_dump_json(), + ) + async def save(self) -> EpisodeLog: """ save the EpisodeLog to redis, without evaluating @@ -196,6 +211,11 @@ async def save(self) -> EpisodeLog: return epilog async def aact(self, agent_action: AgentAction) -> Observations | None: + if agent_action.action_type == "leave": + self.agents_awake[agent_action.agent_name] = False + if True not in self.agents_awake.values(): + await self.wrap_up_and_stop() + return None if agent_action.action_type == "none": return None @@ -221,9 +241,6 @@ async def aact(self, agent_action: AgentAction) -> Observations | None: if self.turn_number < self.max_turns: self.turn_number += 1 else: - if self.push_to_db: - await self.save() - self.shutdown_event.set() return Observations( observations_map={ output_channel: Observation( diff --git a/uv.lock b/uv.lock index 5017e0e0..217200dd 100644 --- a/uv.lock +++ b/uv.lock @@ -10,9 +10,10 @@ resolution-markers = [ [[package]] name = "aact" version = "0.0.10" -source = { registry = "https://pypi.org/simple" } +source = { git = "https://github.com/ProKil/aact?branch=feature%2Fnode-manager#56cd2a2aad8a0e806e4f3a170e848cb1e1ad0720" } dependencies = [ { name = "aiofiles" }, + { name = "aiohttp" }, { name = "aiostream" }, { name = "numpy" }, { name = "pydantic" }, @@ -22,10 +23,6 @@ dependencies = [ { name = "tomlkit", marker = "python_full_version < '3.11'" }, { name = "typer" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/6e/9f/2b32aca3e2fe614df4e04a074870b6b27ef037af62f639b0e4d0b33abb31/aact-0.0.10.tar.gz", hash = "sha256:0cde5360d27bab002a43e9895c4006bfa541f6c2db798412f4aad1fdb685632e", size = 113329 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/31/18/32beed32416f8c9618ed4fc42e33eef94d7c181caf59c6909b3841047006/aact-0.0.10-py3-none-any.whl", hash = "sha256:2c1959666270acc681aafc1452aa089cb26a24a0871b01faa7761fa300b2fc9a", size = 29102 }, -] [[package]] name = "absl-py" @@ -3144,7 +3141,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "aact" }, + { name = "aact", git = "https://github.com/ProKil/aact?branch=feature%2Fnode-manager" }, { name = "absl-py", specifier = ">=2.0.0,<3.0.0" }, { name = "anthropic", marker = "extra == 'anthropic'" }, { name = "beartype", specifier = ">=0.14.0,<0.20.0" }, From 7fc721245285234b8d6ffd5332f943ff95e3dce0 Mon Sep 17 00:00:00 2001 From: XuhuiZhou Date: Fri, 13 Dec 2024 21:24:44 -0500 Subject: [PATCH 08/10] stopping all agents --- sotopia/experimental/agents/moderator.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/sotopia/experimental/agents/moderator.py b/sotopia/experimental/agents/moderator.py index 6300a3af..4cf8261d 100644 --- a/sotopia/experimental/agents/moderator.py +++ b/sotopia/experimental/agents/moderator.py @@ -179,13 +179,10 @@ async def wrap_up_and_stop(self) -> None: await self.save() await asyncio.sleep(0.5) print("stopping all agents") - for output_channel, output_channel_type in self.output_channel_types.items(): - await self.r.publish( - output_channel, - Message[output_channel_type]( # type:ignore[valid-type] - data=f"shutdown:{self.node_name}" - ).model_dump_json(), - ) + await self.r.publish( + f"shutdown:{self.node_name}", + "shutdown", + ) async def save(self) -> EpisodeLog: """ From a3fb6813abe758cad82c70f5f234537857f9a76b Mon Sep 17 00:00:00 2001 From: XuhuiZhou Date: Fri, 13 Dec 2024 21:27:13 -0500 Subject: [PATCH 09/10] fix mypy --- tests/experimental/test_agent.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/experimental/test_agent.py b/tests/experimental/test_agent.py index 020c2131..834c4286 100644 --- a/tests/experimental/test_agent.py +++ b/tests/experimental/test_agent.py @@ -19,11 +19,13 @@ async def aact(self, observation: Tick) -> Tick: @pytest.mark.asyncio async def test_base_agent() -> None: async with ReturnPlusOneAgent( + node_name="test_base_agent", input_channel_types=[("input", Tick)], output_channel_types=[("output", Tick)], redis_url="redis://localhost:6379/0", ) as agent1: async with ReturnPlusOneAgent( + node_name="test_base_agent_2", input_channel_types=[("output", Tick)], output_channel_types=[("final", Tick)], redis_url="redis://localhost:6379/0", From ffe0116fc49210be5ebc81f77bd21095b17fd944 Mon Sep 17 00:00:00 2001 From: XuhuiZhou Date: Fri, 13 Dec 2024 21:34:19 -0500 Subject: [PATCH 10/10] fix mypy error --- sotopia/experimental/agents/moderator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sotopia/experimental/agents/moderator.py b/sotopia/experimental/agents/moderator.py index 4cf8261d..ce57fb38 100644 --- a/sotopia/experimental/agents/moderator.py +++ b/sotopia/experimental/agents/moderator.py @@ -3,15 +3,15 @@ if sys.version_info < (3, 11): - pass + from typing_extensions import Self else: - pass + from typing import Self from aact import Message, NodeFactory, Node from aact.messages import DataModel, DataModelFactory -from typing import Literal, Self, Any, AsyncIterator +from typing import Literal, Any, AsyncIterator from pydantic import Field from sotopia.database import EpisodeLog