diff --git a/packages/@liexp/backend/src/providers/ai/langchain.provider.ts b/packages/@liexp/backend/src/providers/ai/langchain.provider.ts index 0149563f2..22dcda7b3 100644 --- a/packages/@liexp/backend/src/providers/ai/langchain.provider.ts +++ b/packages/@liexp/backend/src/providers/ai/langchain.provider.ts @@ -1,5 +1,8 @@ import { type Document as LangchainDocument } from "@langchain/core/documents"; -import { StringOutputParser } from "@langchain/core/output_parsers"; +import { + JsonOutputParser, + StringOutputParser, +} from "@langchain/core/output_parsers"; import { PromptTemplate } from "@langchain/core/prompts"; import { RunnablePassthrough, @@ -7,6 +10,10 @@ import { } from "@langchain/core/runnables"; import { ChatOpenAI, OpenAIEmbeddings } from "@langchain/openai"; import { GetLogger } from "@liexp/core/lib/logger/index.js"; +import { + type CreateEventBody, + type EventType, +} from "@liexp/shared/lib/io/http/Events/index.js"; import type * as Reader from "fp-ts/lib/Reader.js"; import { loadSummarizationChain } from "langchain/chains"; import { RecursiveCharacterTextSplitter } from "langchain/text_splitter"; @@ -41,7 +48,6 @@ Below you find the text you need to summarize. export interface LangchainProvider { chat: ChatOpenAI; - // embeddings: OpenAIEmbeddings; queryDocument: ( url: LangchainDocument[], question: string, @@ -51,6 +57,13 @@ export interface LangchainProvider { text: LangchainDocument[], options?: { model?: AvailableModels; prompt?: string; question?: string }, ) => Promise; + createEventFromDocuments: ( + content: LangchainDocument[], + question: string, + type: EventType, + prompt: string, + options?: { model?: AvailableModels }, + ) => Promise; } export type AvailableModels = @@ -111,6 +124,7 @@ export const GetLangchainProvider = ( const embeddings = new OpenAIEmbeddings({ model, apiKey: opts.apiKey, + timeout: 60 * 30, // 30 minutes configuration: { baseURL: opts.baseURL, }, @@ -129,6 +143,7 @@ export const GetLangchainProvider = ( // Retrieve and generate using the relevant snippets of the blog. const retriever = vectorStore.asRetriever(); + const prompt = PromptTemplate.fromTemplate(EMBEDDINGS_PROMPT); const ragChain = RunnableSequence.from([ @@ -190,6 +205,85 @@ export const GetLangchainProvider = ( return output; }, + createEventFromDocuments: async ( + content, + question, + type, + prompt, + options, + ) => { + const model = + options?.model ?? opts.models?.embeddings ?? "text-embedding-ada-002"; + const chatModel = options?.model ?? opts.models?.chat ?? "gpt-4o"; + + const embeddings = new OpenAIEmbeddings({ + model, + apiKey: opts.apiKey, + configuration: { + baseURL: opts.baseURL, + }, + }); + + const chat = new ChatOpenAI({ + model: chatModel, + temperature: 0, + apiKey: opts.apiKey, + timeout: 60 * 30, + maxRetries: 3, + configuration: { + baseURL: opts.baseURL, + }, + streaming: true, + onFailedAttempt: (error) => { + langchainLogger.error.log("Failed attempt", error); + return error; + }, + }); + + const textSplitter = new RecursiveCharacterTextSplitter({ + chunkSize: 2000, + chunkOverlap: 100, + }); + const splits = await textSplitter.splitDocuments(content); + + const vectorStore = await MemoryVectorStore.fromDocuments( + splits, + embeddings, + ); + + // Retrieve and generate using the relevant snippets of the blog. + const retriever = vectorStore.asRetriever({ verbose: true }); + + const formatInstructions = `Respond with a valid JSON object, containing the fields: "title" and "date", considering the content describes an event of type "${type}".`; + const promptTemplate = await PromptTemplate.fromTemplate(prompt).partial({ + format_instructions: formatInstructions, + }); + + // Set up a parser + inject instructions into the prompt template. + const parser = new JsonOutputParser(); + + const ragChain = RunnableSequence.from([ + { + context: retriever.pipe(formatDocumentsAsString), + question: new RunnablePassthrough(), + }, + promptTemplate.pipe(chat).pipe(parser), + ]); + + const stream = await ragChain.stream({ + question, + }); + + let output: any; + for await (const chunk of stream) { + langchainLogger.debug.log("chunk", chunk); + output.push(chunk); + } + + langchainLogger.info.log("output", output); + + return JSON.stringify(output); + }, }; }; diff --git a/services/ai-bot/src/flows/ai/createEventFromText.flow.ts b/services/ai-bot/src/flows/ai/createEventFromText.flow.ts new file mode 100644 index 000000000..5fe5a4391 --- /dev/null +++ b/services/ai-bot/src/flows/ai/createEventFromText.flow.ts @@ -0,0 +1,41 @@ +import { createEventFromText } from "@liexp/backend/lib/flows/ai/createEventFromText.flow.js"; +import { LoggerService } from "@liexp/backend/lib/services/logger/logger.service.js"; +import { fp, pipe } from "@liexp/core/lib/fp/index.js"; +import { type CreateEventFromTextQueueData } from "@liexp/shared/lib/io/http/Queue/CreateEventFromTextQueueData.js"; +import { toAIBotError } from "../../common/error/index.js"; +import { type ClientContext } from "../../context.js"; +import { loadDocs } from "./common/loadDocs.flow.js"; +import { getPromptFromResource } from "./prompts.js"; +import { type JobProcessRTE } from "#services/job-processor/job-processor.service.js"; + +export const createEventFromTextFlow: JobProcessRTE< + CreateEventFromTextQueueData +> = (job) => { + return pipe( + fp.RTE.Do, + fp.RTE.bind("docs", () => loadDocs(job)), + fp.RTE.bind( + "jsonSchema", + () => (ctx) => + pipe( + ctx.endpointsRESTClient.Endpoints.Event.getList({ + filter: { eventType: [job.data.type] }, + sort: { field: "updatedAt", order: "DESC" }, + }), + fp.TE.map((events) => events.data[0]), + fp.TE.mapLeft(toAIBotError), + ), + ), + fp.RTE.chain(({ docs, jsonSchema }) => + createEventFromText( + docs, + job.data.text, + job.data.type, + job.prompt ?? getPromptFromResource(job.resource, job.type), + JSON.stringify(jsonSchema), + ), + ), + fp.RTE.map((event) => JSON.stringify(event)), + LoggerService.RTE.debug("`createEventFromTextFlow` result: %O"), + ); +}; diff --git a/services/ai-bot/src/flows/ai/createEventFromURL.flow.ts b/services/ai-bot/src/flows/ai/createEventFromURL.flow.ts new file mode 100644 index 000000000..2f92feba6 --- /dev/null +++ b/services/ai-bot/src/flows/ai/createEventFromURL.flow.ts @@ -0,0 +1,33 @@ +import { type AvailableModels } from "@liexp/backend/lib/providers/ai/langchain.provider.js"; +import { LoggerService } from "@liexp/backend/lib/services/logger/logger.service.js"; +import { fp, pipe } from "@liexp/core/lib/fp/index.js"; +import { type CreateEventFromURLQueueData } from "@liexp/shared/lib/io/http/Queue/CreateEventFromURLQueue.js"; +import { toAIBotError } from "../../common/error/index.js"; +import { loadDocs } from "./common/loadDocs.flow.js"; +import { getPromptFromResource } from "./prompts.js"; +import { type JobProcessRTE } from "#services/job-processor/job-processor.service.js"; + +const defaultQuestion = ""; + +export const createEventFromURLFlow: JobProcessRTE< + CreateEventFromURLQueueData +> = (job) => (ctx) => { + return pipe( + loadDocs(job)(ctx), + fp.TE.chain((docs) => + fp.TE.tryCatch(() => { + return ctx.langchain.createEventFromDocuments( + docs, + job.question ?? defaultQuestion, + job.data.type, + job.prompt ?? getPromptFromResource(job.resource, job.type), + { + model: ctx.config.config.localAi.models + ?.embeddings as AvailableModels, + }, + ); + }, toAIBotError), + ), + LoggerService.TE.debug(ctx, "`createEventFlow` result: %O"), + ); +}; diff --git a/services/ai-bot/src/flows/ai/jobProcessor.ts b/services/ai-bot/src/flows/ai/jobProcessor.ts index ec0150f1e..2d27bb72d 100644 --- a/services/ai-bot/src/flows/ai/jobProcessor.ts +++ b/services/ai-bot/src/flows/ai/jobProcessor.ts @@ -1,12 +1,11 @@ -import { fp } from "@liexp/core/lib/fp/index.js"; import { OpenAICreateEventFromTextType } from "@liexp/shared/lib/io/http/Queue/CreateEventFromTextQueueData.js"; import { OpenAICreateEventFromURLType } from "@liexp/shared/lib/io/http/Queue/CreateEventFromURLQueue.js"; import { OpenAIEmbeddingQueueType, OpenAISummarizeQueueType, } from "@liexp/shared/lib/io/http/Queue/index.js"; -import { type Queue } from "@liexp/shared/lib/io/http/index.js"; -import { type ClientContextRTE } from "../../types.js"; +import { createEventFromTextFlow } from "./createEventFromText.flow.js"; +import { createEventFromURLFlow } from "./createEventFromURL.flow.js"; import { embedAndQuestionFlow } from "./embedAndQuestion.js"; import { summarizeTextFlow } from "./summarizeTexFlow.js"; import { GetJobProcessor } from "#services/job-processor/job-processor.service.js"; @@ -14,8 +13,6 @@ import { GetJobProcessor } from "#services/job-processor/job-processor.service.j export const JobProcessor = GetJobProcessor({ [OpenAISummarizeQueueType.value]: summarizeTextFlow, [OpenAIEmbeddingQueueType.value]: embedAndQuestionFlow, - [OpenAICreateEventFromURLType.value]: ( - job: Queue.Queue, - ): ClientContextRTE => fp.RTE.right(""), - [OpenAICreateEventFromTextType.value]: (job: Queue.Queue) => fp.RTE.right(""), + [OpenAICreateEventFromURLType.value]: createEventFromURLFlow, + [OpenAICreateEventFromTextType.value]: createEventFromTextFlow, }); diff --git a/services/api/test/vitest.config.e2e.ts b/services/api/test/vitest.config.e2e.ts index 7031e3aa6..1d77a95b4 100644 --- a/services/api/test/vitest.config.e2e.ts +++ b/services/api/test/vitest.config.e2e.ts @@ -10,6 +10,7 @@ const config = extendBaseConfig(import.meta.url, (toAlias) => ({ globalSetup: [toAlias(`globalSetup.ts`)], exclude: ["**/build", "**/src/migrations", "**/src/scripts"], pool: "forks", + bail: 1, poolOptions: { forks: { singleFork: process.env.CI === "true" ? true : false, diff --git a/services/worker/src/jobs/processOpenAIJobsDone.job.ts b/services/worker/src/jobs/processOpenAIJobsDone.job.ts index 71e829a53..5c9998b47 100644 --- a/services/worker/src/jobs/processOpenAIJobsDone.job.ts +++ b/services/worker/src/jobs/processOpenAIJobsDone.job.ts @@ -13,6 +13,9 @@ import { } from "@liexp/backend/lib/services/entity-repository.service.js"; import { LoggerService } from "@liexp/backend/lib/services/logger/logger.service.js"; import { fp, pipe } from "@liexp/core/lib/fp/index.js"; +import { DecodeError } from "@liexp/shared/lib/io/http/Error/DecodeError.js"; +import { Event } from "@liexp/shared/lib/io/http/Events/index.js"; +import { OpenAICreateEventFromTextType } from "@liexp/shared/lib/io/http/Queue/CreateEventFromTextQueueData.js"; import { type Queue } from "@liexp/shared/lib/io/http/index.js"; import { Equal, type FindOptionsWhere } from "typeorm"; import { type RTE } from "../types.js"; @@ -60,6 +63,33 @@ const processDoneJobBlockNoteResult = ); }; +const processDoneJobEventResult = + (dbService: EntityRepository) => + (job: Queue.Queue): RTE => { + return pipe( + fp.RTE.Do, + fp.RTE.bind("event", () => { + return pipe( + fp.RTE.of(job.result), + fp.RTE.chainEitherK(Event.decode), + fp.RTE.mapLeft((errs) => DecodeError.of("Event", errs)), + ); + }), + fp.RTE.chain(({ event }) => + dbService.save([ + { + ...event, + links: event.links.map((l) => ({ id: l })), + media: event.media.map((m) => ({ id: m })), + keywords: event.keywords.map((k) => ({ id: k })), + socialPosts: event.socialPosts?.map((s) => ({ id: s })), + }, + ]), + ), + fp.RTE.map(() => job), + ); + }; + export const processDoneJob = (job: Queue.Queue): RTE => { return pipe( fp.RTE.right(job), @@ -105,6 +135,13 @@ export const processDoneJob = (job: Queue.Queue): RTE => { } if (job.resource === "events") { + if (OpenAICreateEventFromTextType.is(job.type)) { + return pipe( + processDoneJobEventResult(EventRepository)(job), + fp.RTE.map(() => job), + ); + } + return pipe( processDoneJobBlockNoteResult(EventRepository)(job), fp.RTE.map(() => job),