Skip to content

Commit

Permalink
implemented the evaluator for multiagent
Browse files Browse the repository at this point in the history
  • Loading branch information
JXZhou authored and JXZhou committed Jan 15, 2025
1 parent 7558927 commit 8fc24b5
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
from sotopia.experimental.agents.base_agent import BaseAgent
from sotopia.experimental.agents.datamodels import Observation, AgentAction
from sotopia.database.persistent_profile import AgentProfile
from typing import Any

from sotopia.generation_utils import agenerate
from sotopia.generation_utils.generate import StrOutputParser

from time import sleep
# Check Python version
if sys.version_info >= (3, 11):
pass
Expand Down Expand Up @@ -39,7 +41,7 @@ def __init__(
model_name: str,
goal: str,
agent_name: str | None = None,
background: dict | None = None,
background: dict[str, Any] | None = None,
agent_pk: str | None = None,
redis_url: str = "redis://localhost:6379/0",
):
Expand All @@ -54,14 +56,15 @@ def __init__(
self.count_ticks: int = 0
self.message_history: list[Observation] = []
self.goal: str = goal
self.model_name: str = model_name
self.agent_profile_pk: str = agent_pk
self.name: str = agent_name
self.background: dict = background
self.model_name: str = model_name
self.agent_profile_pk: str | None = agent_pk
self.name: str | None = agent_name
self.background: dict[str,Any] | None = background
self.awake: bool = False

def set_profile(self, use_pk_value: bool):
profile: AgentProfile = None
def set_profile(self, use_pk_value: bool) -> None:
if not use_pk_value:
assert (self.background is not None and self.name is not None), "Background and name must be provided"
if " " in self.name:
first_name, last_name = self.name.split(" ", 1)
else:
Expand All @@ -84,8 +87,16 @@ def _format_message_history(self, message_history: list[Observation]) -> str:

async def aact(self, obs: Observation) -> AgentAction:
if obs.turn_number == -1:
if(self.awake):
return AgentAction(
agent_name=self.name,
output_channel=self.output_channel,
action_type="none",
argument="",
)
args = json.loads(obs.last_turn)
self.set_profile(args["use_pk_value"])
self.awake = True
return AgentAction(
agent_name=self.name,
output_channel=self.output_channel,
Expand Down
9 changes: 5 additions & 4 deletions examples/experimental/sotopia_original_replica/origin.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
redis_url = "redis://localhost:6379/0"
extra_modules = ["examples.experimental.sotopia_original_replica.llm_agent_sotopia", "examples.experimental.nodes.chat_print_node", "sotopia.experimental.agents.moderator"]
extra_modules = ["examples.experimental.sotopia_original_replica.llm_agent_sotopia", "examples.experimental.nodes.chat_print_node", "sotopia.experimental.agents.moderator","sotopia.experimental.agents.evaluators"]


[[nodes]]
Expand All @@ -9,10 +9,10 @@ node_class = "moderator"
[nodes.node_args]
output_channels = ["moderator:Jane", "moderator:Jack"]
input_channels = ["Jane:moderator", "Jack:moderator"]
evaluator_channels = ["evaluator:moderator"]
evaluator_channels = [["evaluator:moderator","moderator:evaluator"]]
agent_mapping = {"moderator:Jane" = "Jane", "moderator:Jack" = "Jack"}
scenario = "Two friends are sitting in a cafe and catching up with each other's lives."
max_turns = 2
max_turns = 3
push_to_db = false
evaluate_episode = true
use_pk_value = false
Expand Down Expand Up @@ -63,4 +63,5 @@ node_class = "evaluator"

[nodes.node_args]
input_channels = ["moderator:evaluator"]
output_channels = ["moderator:evaluator"]
output_channels = ["evaluator:moderator"]
model_name = "gpt-4o-mini"
2 changes: 1 addition & 1 deletion sotopia/database/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class BaseEpisodeLog(BaseModel):
models: list[str] | None = Field(index=True, default=[])
messages: list[list[tuple[str, str, str]]] # Messages arranged by turn
reasoning: str = Field(default="")
rewards: list[tuple[float, dict[str, float]] | float] # Rewards arranged by turn
rewards: list[tuple[float, dict[str, float]] | float | dict[str, dict]] # Rewards arranged by turn
rewards_prompt: str

@model_validator(mode="after")
Expand Down
84 changes: 78 additions & 6 deletions sotopia/experimental/agents/evaluators.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,99 @@
from aact import NodeFactory, Node
import json

from aact import NodeFactory, Node, Message
from .base_agent import BaseAgent
from .logs import EpisodeLog
from .datamodels import AgentAction, Observation
from sotopia.database.persistent_profile import AgentProfile

from typing import AsyncIterator, Generic, TypeVar, Type, Any
from pydantic import BaseModel, Field
from asyncio import Event

from sotopia.envs.evaluators import GoalDimension
from sotopia.generation_utils.generate import agenerate
from langchain.output_parsers import PydanticOutputParser


default_reward_prompt = """
{history}
Based on previous interactions, evaluate how well each of the participants achieve their goals.
Here are the list of agents:
{agent_list}
Please output your evaluation following the format:
{format_instructions}
"""

T_eval_dim = TypeVar("T_eval_dim", bound=BaseModel)

class EvaluationForMutiAgents(BaseModel, Generic[T_eval_dim]):
agents_evaluation: dict[str, T_eval_dim] = Field(description="the evaluation for each agent, the key is the agent name,be sure to include every agent in the agent list, the value should follow the evaluation dimension format")

@NodeFactory.register("evaluator")
class Evaluator(Node[AgentAction, Observation]):
class Evaluator(BaseAgent[Observation, AgentAction]):
def __init__(
self,
node_name: str,
model_name: str,
input_channels: list[str],
output_channels: list[str],
redis_url: str,
reward_prompt: str = default_reward_prompt,
eval_dim_class: str = "GoalDimension",
temperature: float = 0.0,
):
super().__init__(
input_channel_types=[
(input_channel, AgentAction) for input_channel in input_channels
(input_channel, Observation) for input_channel in input_channels
],
output_channel_types=[
(output_channel, Observation) for output_channel in output_channels
(output_channel, AgentAction) for output_channel in output_channels
],
node_name=node_name,
redis_url=redis_url,
)
self.output_channels = output_channels
self.model_name = model_name
self.reward_prompt = reward_prompt
self.temperature = temperature
if eval_dim_class == "GoalDimension":
self.response_format_class:Type[BaseModel] = EvaluationForMutiAgents[GoalDimension]
else:
raise ValueError(f"the eval_dim_class : {eval_dim_class} is not implemented")
#TODO: need a registry for the evaluation dimension class, so dimension can be initialized with a str

async def aevaluate(self, episode: EpisodeLog) -> AgentAction | None:
raise NotImplementedError
async def aact(self, content: Observation) -> AgentAction:
epilog = EpisodeLog(**json.loads(content.last_turn))

result = await self.aevaluate(epilog)
return AgentAction(
agent_name="evaluator",
output_channel=f"evaluator:{content.agent_name}",
action_type="speak",
argument=json.dumps({
"reward":json.dumps(result),
"reward_prompt":self.reward_prompt
})
)


async def aevaluate(self, episode: EpisodeLog) -> dict[str, Any]:
#TODO: below is a temporary implementation, need to replaced by using render_for_humans in EpisodeLog
history = "\n".join(f"{msg[0][0]} said: {msg[0][2]}"for msg in episode.messages)
agent_list = []
for pk in episode.agents:
agent = AgentProfile.get(pk)
name = agent.first_name+" "+agent.last_name
name = name.strip()
agent_list.append(name)

res:BaseModel = await agenerate(
model_name=self.model_name,
template=self.reward_prompt,
input_values=dict(history=history, agent_list=str(agent_list)),
output_parser=PydanticOutputParser[self.response_format_class]( # type: ignore[name-defined]
pydantic_object=self.response_format_class
),
temperature=self.temperature,
)
return res.model_dump()["agents_evaluation"]
79 changes: 58 additions & 21 deletions sotopia/experimental/agents/moderator.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(
output_channels: list[str],
scenario: str,
agent_mapping: dict[str, str],
evaluator_channels: list[str] = [],
evaluator_channels: list[list[str]] = [],
tag: str = "",
redis_url: str = "redis://localhost:6379/0",
action_order: Literal["simultaneous", "round-robin", "random"] = "round-robin",
Expand All @@ -48,11 +48,13 @@ def __init__(
max_turns: int = 20,
push_to_db: bool = False,
use_pk_value: bool = False,
evaluate_episode: bool = False,
) -> None:
print([(channel[0], AgentAction) for channel in evaluator_channels])
super().__init__(
input_channel_types=[
(input_channel, AgentAction) for input_channel in input_channels
],
]+[(channel[0], AgentAction) for channel in evaluator_channels],
output_channel_types=[
(output_channel, Observation) for output_channel in output_channels
],
Expand All @@ -77,10 +79,14 @@ def __init__(
self.agent_models: dict[str, str] = {}
self.agents_awake: dict[str, bool] = {name: False for name in self.agents}
self.all_agents_awake: asyncio.Event = asyncio.Event()
self.evaluator_channels: list[str] = evaluator_channels
self.evaluator_channels: list[list[str]] = evaluator_channels
self.push_to_db: bool = push_to_db
self.use_pk_value: bool = use_pk_value
self.epilog: EpisodeLog = EpisodeLog(messages=[], rewards=[], rewards_prompt="")

self.evaluate_episode: bool = evaluate_episode
assert (not self.evaluate_episode) or len(evaluator_channels) > 0, "if evaluate_episode is True, evaluator_channels should not be empty"

self.epilog: EpisodeLog | None = None # will be initialized in booting process

if self.action_order == "round-robin":
pass
Expand Down Expand Up @@ -154,24 +160,36 @@ async def booting(self) -> None:
}
)
)
await asyncio.sleep(0.1)
await asyncio.sleep(0.2)
while not self.observation_queue.empty():
agent_action = await self.observation_queue.get()
self.agents_awake[agent_action.agent_name] = True
args: dict[str, Any] = json.loads(agent_action.argument)
self.agents_pk[agent_action.agent_name] = args["pk"]
self.agent_models[agent_action.agent_name] = args["model_name"]
if(not self.agents_awake[agent_action.agent_name]):
self.agents_awake[agent_action.agent_name] = True
args: dict[str, Any] = json.loads(agent_action.argument)
self.agents_pk[agent_action.agent_name] = args["pk"]
self.agent_models[agent_action.agent_name] = args["model_name"]
if False not in self.agents_awake.values():
self.all_agents_awake.set()
print("all agents are awake")

self.epilog = EpisodeLog(
environment=self.scenario,
agents=list(self.agents_pk.values()),
tag=self.tag,
models=list(self.agent_models.values()),
messages=[[
("Environment", "Environment", self.scenario)
]],
rewards=[0.0]*len(self.agents),
rewards_prompt="",
)
if self.action_order == "round-robin":
await self.send(
Observations(
observations_map={
output_channel: Observation(
agent_name="moderator",
last_turn="conversation start",
last_turn=self.scenario,
turn_number=0,
available_actions=self.available_actions
if agent_name == self.agents[0]
Expand All @@ -184,12 +202,18 @@ async def booting(self) -> None:
self.current_agent_index += 1

async def wrap_up_and_stop(self) -> None:
if self.evaluator_channels:
epilog = await self.aeval(self.epilog)
if self.push_to_db:
epilog.save()
try:
await asyncio.sleep(0.1)
print("all agents have left, wrap up and stop")
self.shutdown_event.set() # this will disable the task scheduler
if self.evaluate_episode:
epilog = await self.aeval(self.epilog)
if self.push_to_db:
epilog.save()
except Exception as e:
print(f"error in wrap_up_and_stop: {e}")
await asyncio.sleep(0.5)
print("result of this episode:\n", epilog)
print("result of this episode:\n", self.epilog.model_dump_json())
await self.r.publish(
"shutdown:moderator",
"shutdown",
Expand All @@ -207,16 +231,30 @@ async def episode_log_to_messages(
async def aeval(self, epilog: EpisodeLog) -> EpisodeLog:
"""
evaluate the episode
will send the epilog to evaluators, and wait for the evaluation to be finished
"""
assert len(self.evaluator_channels) == 1, "currently only support one evaluator"

for evaluator_channel in self.evaluator_channels:
await self.r.publish(evaluator_channel, epilog.model_dump_json())
print(evaluator_channel[1])
await self.r.publish(evaluator_channel[1], Message[Observation](data=Observation(
agent_name="moderator",
last_turn=epilog.model_dump_json(),
turn_number=self.turn_number,
available_actions=self.available_actions,
)).model_dump_json()
)


print("episode eval started")

for evaluator_channel in self.evaluator_channels:
await self.observation_queue.get()
for _ in range(len(self.evaluator_channels)): # the queue will take in input and output from this channel
raw_res = await self.observation_queue.get()
res = json.loads(raw_res.argument)
epilog.rewards = res["reward"]
epilog.rewards_prompt = res["reward_prompt"]

print("episode eval finished")
epilog.rewards = [0.0] * len(self.agents) # TODO: get real rewards
epilog.rewards_prompt = "" # TODO: get real rewards_prompt
return epilog

async def astep(self, agent_action: AgentAction) -> Observations | None:
Expand Down Expand Up @@ -270,5 +308,4 @@ async def astep(self, agent_action: AgentAction) -> Observations | None:
)
observations_map[output_channel] = observation
self.current_agent_index = (self.current_agent_index + 1) % len(self.agents)

return Observations(observations_map=observations_map)

0 comments on commit 8fc24b5

Please sign in to comment.