-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathchat_wit.py
254 lines (214 loc) Β· 11.1 KB
/
chat_wit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
import logging
from aos.grit import *
from aos.wit import *
from agents.jetpack.messages import *
from agents.jetpack.coder.coder_wit import create_coder_actor
from agents.jetpack.coder.retriever_wit import create_retriever_actor
from agents.jetpack.chat.chat_completions import chat_completion
#========================================================================================
# Setup & State
#
# A "chat" limits a conversation to a single topic and goal.
# Each chat corresponds to a single chat window in Jetpack.
#========================================================================================
logger = logging.getLogger(__name__)
async def create_chat_actor(
ctx:MessageContext,
name:str="Main"
) -> ActorId:
state = ChatState()
state.name = name
prototype_id = await ctx.discovery.find_prototype("chat")
if prototype_id is None:
raise Exception("chat prototype not found")
return await create_actor_from_prototype_with_state(
prototype_id,
state,
ctx.request_response,
ctx.store)
class ChatState(WitState):
name:str="Main"
retriever:ActorId|None = None
coder:ActorId|None = None
code_request:CodeRequest|None = None
code_plan:CodePlanned|None = None
code_spec:CodeSpec|None = None
code_deploy:CodeDeployed|None = None
code_fail:CodeFailed|None = None
current_execution:CodeExecution|None = None
last_result:CodeExecuted|None = None
#========================================================================================
# Wit
#========================================================================================
app = Wit()
@app.genesis_message
async def on_genesis(msg:InboxMessage, ctx:MessageContext, state:ChatState) -> None:
logger.info("received genesis")
#create the downstream actors
coder_id = await create_coder_actor(ctx, name=f"{state.name} Coder")
state.coder = coder_id
logger.info(f"'{state.name}': created coder actor {coder_id.hex()}")
retriever_id = await create_retriever_actor(ctx, forward_to=state.coder)
state.retriever = retriever_id
logger.info(f"'{state.name}': created retriever actor {retriever_id.hex()}")
@app.message("web")
async def on_message_web(content:str, ctx:MessageContext, state:ChatState) -> None:
logger.info(f"'{state.name}': received message from user")
# save in the core
msgt = await ctx.core.gett("messages")
chat_msg = ChatMessage.from_user(content)
msgt.makeb(str(chat_msg.id), True).set_as_json(chat_msg)
# notify the web frontend that the message was received (will be delivered via SSE)
ctx.outbox.add_reply_msg(ctx.message, str(chat_msg.id), mt="receipt")
#if the last message was from a user, then kick off the models
if chat_msg.from_name == 'user':
ctx.outbox.add_new_msg(ctx.actor_id, "complete", mt="complete")
ctx.outbox.add_reply_msg(ctx.message, "Thinking", mt="thinking")
else:
logger.info(f"'{state.name}': last web message was not from 'user', but was from '{chat_msg.from_name}', will skip.")
#========================================================================================
# Frontend States
#========================================================================================
@app.message("complete")
async def on_complete_message(msg:InboxMessage, ctx:MessageContext, state:ChatState) -> None:
logger.info(f"'{state.name}': completion")
# load message history
messages_tree = await ctx.core.gett("messages")
messages = await ChatMessage.load_from_tree(messages_tree)
if len(messages) == 0 :
return
# ensure that the last message was from the user
last_message = messages[-1]
if last_message.from_name != 'user':
logger.info(f"'{state.name}': last message was not from 'user', but was from '{last_message.from_name}', will skip.")
return
# chat completion, and process result
result = await chat_completion(messages, state.code_spec)
if isinstance(result, str):
logger.info(f"'{state.name}': completion is normal chat message.")
chat_message = ChatMessage.from_actor(result, ctx.actor_id)
elif isinstance(result, CodeRequest):
logger.info(f"'{state.name}': completion is a CodeRequest.")
# message the retriever actor to generate the code
if state.retriever is not None:
ctx.outbox.add_new_msg(state.retriever, result, mt="request")
logger.info(f"'{state.name}': sent CodeRequest mesage to retriever: {state.retriever.hex()}")
else:
logger.info(f"'{state.name}': retriever peer not found")
return
if state.code_request is None:
msg = "I will generate the requested functionality:"
else:
msg = "I will generate the requested changes:"
msg += f"\n```\n{result.task_description}\n```\n"
if(result.input_examples is not None):
msg += f"Here are some examples for the function input:\n```\n{result.input_examples}\n```\n"
state.code_request = result
# create a chat message for the frontend
chat_message = ChatMessage.from_actor(msg, ctx.actor_id)
ctx.outbox.add_new_msg(ctx.agent_id, "Start Coding", mt="thinking")
elif isinstance(result, CodeExecution):
logger.info(f"'{state.name}': completion is a CodeExecution.")
if state.code_spec is None:
logger.info(f"'{state.name}': code_spec is None, will skip.")
return
# message the coder actor to execute the code
if state.coder is not None:
ctx.outbox.add_new_msg(state.coder, result, mt="execute")
logger.info(f"'{state.name}': sent CodeExecution mesage to coder: {state.coder.hex()}")
else:
logger.info(f"'{state.name}': coder peer not found")
return
msg = "I will call the function with the following arguments:"
if(result.input_arguments is not None):
msg += f"\n```\n{result.input_arguments}\n```\n"
state.current_execution = result
# create a chat message for the frontend
chat_message = ChatMessage.from_actor(msg, ctx.actor_id)
ctx.outbox.add_new_msg(ctx.agent_id, "Executing", mt="thinking")
# save message in history
messages_tree.makeb(str(chat_message.id), True).set_as_json(chat_message)
# notify the web frontend that the message was generated (will be delivered via SSE)
logger.info(f"'{state.name}': send receipt to web frontend")
ctx.outbox.add_new_msg(ctx.agent_id, str(chat_message.id), mt="receipt")
#========================================================================================
# Coder Callbacks
#========================================================================================
@app.message("code_speced")
async def on_message_code_speced(spec:CodeSpec, ctx:MessageContext, state:ChatState) -> None:
logger.info(f"'{state.name}': received callback: code_speced")
state.code_spec = spec
ctx.outbox.add_new_msg(ctx.agent_id, "artifact", mt="artifact")
@app.message("code_planned")
async def on_message_code_planned(plan:CodePlanned, ctx:MessageContext, state:ChatState) -> None:
logger.info(f"'{state.name}': received callback: code_planned")
state.code_plan = plan
if state.code_plan.plan is not None:
#todo: also output as chat message
ctx.outbox.add_new_msg(ctx.agent_id, "artifact", mt="artifact")
@app.message("code_deployed")
async def on_message_code_deployed(code:CodeDeployed, ctx:MessageContext, state:ChatState) -> None:
logger.info(f"'{state.name}': received callback: code_deployed")
state.code_deploy = code
if state.code_deploy.code is not None:
# notify frontend
chat_message = ChatMessage.from_actor("**Igenerated the function and deployed it.** \nYou can view it in the artifacts on the right.", ctx.actor_id)
messages_tree = await ctx.core.gett("messages")
messages_tree.makeb(str(chat_message.id), True).set_as_json(chat_message)
ctx.outbox.add_new_msg(ctx.agent_id, str(chat_message.id), mt="receipt")
ctx.outbox.add_new_msg(ctx.agent_id, "artifact", mt="artifact")
@app.message("code_executed")
async def on_message_code_executed(exec:CodeExecuted, ctx:MessageContext, state:ChatState) -> None:
logger.info(f"'{state.name}': received callback: code_executed")
state.current_execution = None
state.last_result = exec
content = ""
links = []
for key in list(exec.output):
if isinstance(exec.output[key], str) and is_object_id_str(exec.output[key]):
obj_id_str = exec.output[key]
obj_id = to_object_id(obj_id_str)
#figure out what's in the object
obj = await ctx.store.load(obj_id)
if not is_blob(obj):
continue
blob = BlobObject(obj, obj_id)
#see if the content type is set
content_type = blob.get_header("Content-Type")
if content_type is None and blob.get_header("ct") == "s":
content_type = "text/plain"
if content_type is None and blob.get_header("ct") == "j":
content_type = "application/json"
if content_type is None:
continue
#if the content type is an image, then display it as an image
if content_type.startswith("image/"):
image_url = f"../../../../grit/objects/{obj_id_str}"
content += f"Here is the image I generated:\n![]({image_url})\n"
#if the content type is text, then display it as text
elif content_type.startswith("text/"):
content += f"Here is what I generated:\n\n{blob.get_as_str()}\n"
#if the content type is json, then display it as json
elif content_type.startswith("application/json"):
content += f"Here is the JSON I generated:\n```\n{json.dumps(blob.get_as_json(), indent=2)}\n```\n"
links.append(f"[{key}: {obj_id_str}](../../../../grit/objects/{obj_id_str})")
#pop from the dictionary
exec.output.pop(key)
if len(links) > 0:
content += "Here are the links to the generated objects:\n"
for link in links:
content += f"- {link}\n"
if len(exec.output) > 0:
content += f"Here is the result from the execution:\n```\n{exec.output}\n```"
chat_message = ChatMessage.from_actor(content, ctx.actor_id)
messages_tree = await ctx.core.gett("messages")
messages_tree.makeb(str(chat_message.id), True).set_as_json(chat_message)
ctx.outbox.add_new_msg(ctx.agent_id, str(chat_message.id), mt="receipt")
ctx.outbox.add_new_msg(ctx.agent_id, "artifact", mt="artifact")
@app.message("code_failed")
async def on_message_code_failed(fail:CodeFailed, ctx:MessageContext, state:ChatState) -> None:
logger.warning(f"'{state.name}': received callback: code_failed")
state.code_fail = fail
if state.code_fail.errors is not None:
#todo: also output as chat message
ctx.outbox.add_new_msg(ctx.agent_id, "artifact", mt="artifact")