From 793f2c09169f302cb75620eb70dcfa95de7d7bd9 Mon Sep 17 00:00:00 2001 From: Ken Collins Date: Tue, 30 Apr 2024 23:58:54 -0400 Subject: [PATCH] Switch to Streaming. --- TODO.md | 9 ++ src/experts/assistant.js | 63 +++++------- src/experts/messages.js | 68 ------------- src/experts/run.js | 176 +++++++++++++++++---------------- src/experts/thread.js | 21 ---- src/helpers.js | 13 ++- test/experts/assistant.test.js | 2 +- 7 files changed, 133 insertions(+), 219 deletions(-) delete mode 100644 src/experts/messages.js diff --git a/TODO.md b/TODO.md index 1073497..497e507 100644 --- a/TODO.md +++ b/TODO.md @@ -3,3 +3,12 @@ * Remove the Assistant#messages. Is even `Message` needed? * TODO: Revisit this and assistantsToolsOutputs. * Is `addAssistantTool` the right name now? + + +```javascript +stream.on("event", (e) => { + if (e.event.startsWith("thread.run")) { + aRun = e.data; + } +}); +``` diff --git a/src/experts/assistant.js b/src/experts/assistant.js index 63a63a0..1b88a02 100644 --- a/src/experts/assistant.js +++ b/src/experts/assistant.js @@ -1,7 +1,6 @@ import { openai } from "../openai.js"; import { debug, formatToolOutputs } from "../helpers.js"; import { Thread } from "./thread.js"; -import { Message } from "./messages.js"; import { Run } from "./run.js"; class Assistant { @@ -11,11 +10,6 @@ class Assistant { return asst; } - static async delete() { - const asst = new this(); - await asst.deleteByName(); - } - constructor(agentName, description, instructions, options = {}) { this.agentName = agentName; this.description = description; @@ -62,15 +56,19 @@ class Assistant { return this.assistant.metadata; } - // Messages: Asking and access. + // Interface async ask(message, threadID) { return await this.askAssistant(message, threadID); } - get lastMessageContent() { - return this.messages[0].content; - } + // Run Event Overrides + + onEvent(event) {} + + onTextDelta(delta, snapshot) {} + + onToolCallDelta(delta, snapshot) {} // Tool Assistant @@ -84,44 +82,27 @@ class Assistant { } } - // Tool Outputs: Used for response pass-thru. - - get isAssistantsToolsOutputs() { - return ( - this.assistantsToolsOutputs && this.assistantsToolsOutputs.length > 0 - ); - } - - addAssistantsToolsOutputs(output) { - this.assistantsToolsOutputs.push(output); - } - - clearAssistantsToolsOutputs() { - if (this.isAssistantsToolsOutputs) { - this.assistantsToolsOutputs.length = 0; - } - } - // Private async askAssistant(message, threadID) { if (!this.llm) return; - this.clearAssistantsToolsOutputs(); let thread = await Thread.find(threadID); if (this.isTool && this.hasToolThread) { thread = await thread.toolThread(this); } - const _msg = await Message.createForAssistant(this, message, thread); - const run = await Run.createForAssistant(this, thread); - let output = await run.actions(); + const messageContent = + typeof message === "string" ? message : JSON.stringify(message); + debug("💌 " + JSON.stringify(messageContent)); + await openai.beta.threads.messages.create(thread.id, { + role: "user", + content: messageContent, + }); + const run = await Run.streamForAssistant(this, thread); + let output = await run.wait(); if (this.isTool) { if (this.llm && this.ignoreLLMToolOutput) { output = ""; } - } else { - if (this.isAssistantsToolsOutputs) { - output = formatToolOutputs(this.assistantsToolsOutputs); - } } debug(`🤖 ${output}`); return output; @@ -129,11 +110,6 @@ class Assistant { // Private (Lifecycle) - async reCreate() { - const assistant = (await this.deleteByName()) || (await this.create()); - return assistant; - } - async findByID() { if (!this.id) return; const assistant = await openai.beta.assistants.retrieve(this.id); @@ -149,6 +125,11 @@ class Assistant { return assistant; } + async reCreate() { + const assistant = (await this.deleteByName()) || (await this.create()); + return assistant; + } + async create() { const assistant = await openai.beta.assistants.create({ model: this.model, diff --git a/src/experts/messages.js b/src/experts/messages.js deleted file mode 100644 index 2f5afbe..0000000 --- a/src/experts/messages.js +++ /dev/null @@ -1,68 +0,0 @@ -import { debug } from "../helpers.js"; -import { openai } from "../openai.js"; - -class Message { - static async createForAssistant(asst, content, thread) { - const contentString = - typeof content === "string" ? content : JSON.stringify(content); - const msg = await openai.beta.threads.messages.create(thread.id, { - role: "user", - content: contentString, - }); - debug("💌 " + JSON.stringify(msg)); - const message = new Message(msg, thread); - asst.messages.unshift(message); - return message; - } - - constructor(message, thread) { - this.message = message; - this.thread = thread; - } - - // Getters - - get type() { - return this.message.type; - } - - get role() { - return this.message.role; - } - - get isUser() { - return this.role === "user"; - } - - get isAssistant() { - return this.role === "assistant"; - } - - get isImageFileContent() { - return this.message.content[0].type === "image_file"; - } - - get content() { - return this.message.content - .filter((c) => c.type === "text") - .map((c) => c.text.value) - .join("\n\n"); - } - - // Assistant Response - - async assistantContent() { - if (!this.isAssistant) return; - const aContent = [this.content]; - // TODO: Handle image file content to public S3 for markdown. - // if (this.isImageFileContent) { - // const imageFileID = message.content[0].image_file.file_id; - // const imageFilePath = await downloadFile(imageFileID); - // const imageMessage = `![Image](https://example.com${imageFilePath})`; - // aContent.push(imageMessage); - // } - return aContent.join("\n\n"); - } -} - -export { Message }; diff --git a/src/experts/run.js b/src/experts/run.js index 4fb8f82..765c970 100644 --- a/src/experts/run.js +++ b/src/experts/run.js @@ -1,21 +1,33 @@ -import { debug, isDebug } from "../helpers.js"; +import { debug, messagesContent } from "../helpers.js"; import { openai } from "../openai.js"; class Run { - static async createForAssistant(assistant, thread) { - debug("ℹ️ Running..."); - const queuedRun = await openai.beta.threads.runs.create(thread.id, { + static async streamForAssistant(assistant, thread) { + debug("🦦 Streaming..."); + const stream = await openai.beta.threads.runs.stream(thread.id, { assistant_id: assistant.id, }); - const run = new Run(assistant, thread, queuedRun); - await run.wait(); - return run; + return new Run(assistant, thread, stream); } - constructor(assistant, thread, run) { + constructor(assistant, thread, stream) { this.assistant = assistant; this.thread = thread; - this.run = run; + this.stream = stream; + } + + set stream(stream) { + this._stream = stream; + this.run = stream.currentRun(); + stream.on("event", (e) => this.onEvent(e)); + stream.on("textDelta", (td, s) => this.onTextDelta(td, s)); + stream.on("toolCallDelta", (tcd, s) => this.onToolCallDelta(tcd, s)); + this.toolOutputs = []; + this.isToolOuputs = false; + } + + get stream() { + return this._stream; } get id() { @@ -26,98 +38,88 @@ class Run { return this.thread.id; } - async wait() { - let polledRun; - let isRunning = true; - while (isRunning) { - polledRun = await openai.beta.threads.runs.retrieve( - this.threadID, - this.id - ); - await this.waitTime(500); - if (!/^(queued|in_progress|cancelling)$/.test(polledRun.status)) { - if (isDebug) { - delete polledRun.instructions; - delete polledRun.description; - } - debug("🏃‍♂️ " + JSON.stringify(polledRun)); - isRunning = false; - } else { - debug("💨 " + JSON.stringify(polledRun.id)); - } - } - const completedRun = polledRun; - const runSteps = await openai.beta.threads.runs.steps.list( - this.threadID, - this.id - ); - for (const step of runSteps.data) { - debug("👣 " + JSON.stringify(step)); - } - this.run = completedRun; - return completedRun; + get messagesOutput() { + return messagesContent(this.messages || []); } - // This looks for any required actions, runs them, submits tool outputs, - // and returns the assistant's messages. - // - async actions() { - if ( + get isRequiredSubmitToolOutputs() { + return ( this.run.status === "requires_action" && this.run.required_action.type === "submit_tool_outputs" - ) { - let isToolOuputs = false; - const toolOutputs = []; - const toolCalls = this.run.required_action.submit_tool_outputs.tool_calls; - debug("🧰 " + JSON.stringify(toolCalls.map((tc) => tc.function.name))); - for (const toolCall of toolCalls) { - debug("🪚 " + JSON.stringify(toolCall)); - if (toolCall.type === "function") { - const toolOutput = { tool_call_id: toolCall.id }; - const toolCaller = - this.assistant.assistantsTools[toolCall.function.name]; - if (toolCaller && typeof toolCaller.ask === "function") { - const output = await toolCaller.ask( - toolCall.function.arguments, - this.threadID - ); - toolOutput.output = output; - isToolOuputs = true; - } - debug("🪵 " + JSON.stringify(toolOutput)); - toolOutputs.push(toolOutput); - } - } - if (isToolOuputs) { - const output = await this.submitToolOutputs(toolOutputs); + ); + } + + get toolCalls() { + return this.run.required_action.submit_tool_outputs.tool_calls; + } + + async wait() { + this.messages = await this.stream.finalMessages(); + this.run = this.stream.currentRun(); + if (this.isRequiredSubmitToolOutputs) { + await this.callTools(); + if (this.isToolOuputs) { + const output = await this.submitToolOutputs(); return output; } else { - return await this.thread.assistantMessageContent(); + return this.messagesOutput; } } else { - return await this.thread.assistantMessageContent(); + return this.messagesOutput; } } - async submitToolOutputs(toolOutputs) { - debug("🏡 Submitting outputs..."); - await openai.beta.threads.runs.submitToolOutputs(this.threadID, this.id, { - tool_outputs: toolOutputs, - }); - if (this.assistant.assistantsToolsPassOutputs) { - toolOutputs.forEach((to) => { - this.assistant.addAssistantsToolsOutputs(to.output); - }); - } - this.run = await this.wait(); - const output = await this.actions(); - return output; + // Private (Stream Event Handlers) + + onEvent(event) { + debug(`📡 Event: ${JSON.stringify(event)}`); + this.assistant.onEvent(event); } - // Private + onTextDelta(delta, snapshot) { + this.assistant.onTextDelta(delta, snapshot); + } + + onToolCallDelta(delta, snapshot) { + this.assistant.onToolCallDelta(delta, snapshot); + } - async waitTime(time) { - return new Promise((resolve) => setTimeout(resolve, time)); + // Private (Tools) + + async callTools() { + const toolCalls = this.toolCalls; + debug(`🧰 ${JSON.stringify(toolCalls.map((tc) => tc.function.name))}`); + for (const toolCall of toolCalls) { + debug("🪚 " + JSON.stringify(toolCall)); + if (toolCall.type === "function") { + const toolOutput = { tool_call_id: toolCall.id }; + const toolCaller = + this.assistant.assistantsTools[toolCall.function.name]; + if (toolCaller && typeof toolCaller.ask === "function") { + const output = await toolCaller.ask( + toolCall.function.arguments, + this.threadID + ); + toolOutput.output = output; + this.isToolOuputs = true; + } + debug("🪵 " + JSON.stringify(toolOutput)); + this.toolOutputs.push(toolOutput); + } + } + } + + async submitToolOutputs() { + debug("🏡 Submitting outputs..."); + this.stream = await openai.beta.threads.runs.submitToolOutputsStream( + this.threadID, + this.id, + { + tool_outputs: this.toolOutputs, + } + ); + const output = await this.wait(); + return output; } } diff --git a/src/experts/thread.js b/src/experts/thread.js index ceb04b3..5c7f633 100644 --- a/src/experts/thread.js +++ b/src/experts/thread.js @@ -1,6 +1,5 @@ import { debug } from "../helpers.js"; import { openai } from "../openai.js"; -import { Message } from "./messages.js"; class Thread { static async find(threadID) { @@ -49,26 +48,6 @@ class Thread { } return thread; } - - async assistantMessageContent() { - const message = await this.assistantMessage(); - return await message.assistantContent(); - } - - // Private - - // Only called after a completed run or submitting tool outputs. - // TODO: Likely will be moot with streaming and need new approach. - // - async assistantMessage() { - const options = { limit: 1 }; - const messages = await openai.beta.threads.messages.list(this.id, options); - const message = new Message(messages.data[0], this); - if (!message.isAssistant) { - throw new Error("ExpectedAssistantMessageError"); - } - return message; - } } export { Thread }; diff --git a/src/helpers.js b/src/helpers.js index 83e3ce8..f72d5f3 100644 --- a/src/helpers.js +++ b/src/helpers.js @@ -7,6 +7,17 @@ const debug = (message) => { } }; +const messagesContent = (messages) => { + return messages + .map((m) => { + return m.content + .filter((c) => c.type === "text") + .map((c) => c.text.value) + .join("\n\n"); + }) + .join("\n\n"); +}; + const formatToolOutputs = (outputs) => { const result = outputs.map((item) => { if (typeof item === "string") { @@ -18,4 +29,4 @@ const formatToolOutputs = (outputs) => { return result.join("\n\n"); }; -export { debug, isDebug, formatToolOutputs }; +export { debug, isDebug, messagesContent, formatToolOutputs }; diff --git a/test/experts/assistant.test.js b/test/experts/assistant.test.js index 4f607a9..86e0b27 100644 --- a/test/experts/assistant.test.js +++ b/test/experts/assistant.test.js @@ -41,7 +41,7 @@ describe("with vector store", () => { "Using a single word response, tell me what food source do Proxima Centauri b inhabitants migrate for?", threadID ); - expect(output).toBe("Snorgronk【4:0†oddFacts.txt】."); + expect(output).toMatch(/Snorgronk/); }); });