Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ai-bot): create event from url job #2078

Merged
merged 1 commit into from
Feb 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 96 additions & 2 deletions packages/@liexp/backend/src/providers/ai/langchain.provider.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import { type Document as LangchainDocument } from "@langchain/core/documents";
import { StringOutputParser } from "@langchain/core/output_parsers";
import {
JsonOutputParser,
StringOutputParser,
} from "@langchain/core/output_parsers";
import { PromptTemplate } from "@langchain/core/prompts";
import {
RunnablePassthrough,
RunnableSequence,
} from "@langchain/core/runnables";
import { ChatOpenAI, OpenAIEmbeddings } from "@langchain/openai";
import { GetLogger } from "@liexp/core/lib/logger/index.js";
import {
type CreateEventBody,
type EventType,
} from "@liexp/shared/lib/io/http/Events/index.js";
import type * as Reader from "fp-ts/lib/Reader.js";
import { loadSummarizationChain } from "langchain/chains";
import { RecursiveCharacterTextSplitter } from "langchain/text_splitter";
Expand Down Expand Up @@ -41,7 +48,6 @@ Below you find the text you need to summarize.

export interface LangchainProvider {
chat: ChatOpenAI;
// embeddings: OpenAIEmbeddings;
queryDocument: (
url: LangchainDocument[],
question: string,
Expand All @@ -51,6 +57,13 @@ export interface LangchainProvider {
text: LangchainDocument[],
options?: { model?: AvailableModels; prompt?: string; question?: string },
) => Promise<string>;
createEventFromDocuments: (
content: LangchainDocument[],
question: string,
type: EventType,
prompt: string,
options?: { model?: AvailableModels },
) => Promise<string>;
}

export type AvailableModels =
Expand Down Expand Up @@ -111,6 +124,7 @@ export const GetLangchainProvider = (
const embeddings = new OpenAIEmbeddings({
model,
apiKey: opts.apiKey,
timeout: 60 * 30, // 30 minutes
configuration: {
baseURL: opts.baseURL,
},
Expand All @@ -129,6 +143,7 @@ export const GetLangchainProvider = (

// Retrieve and generate using the relevant snippets of the blog.
const retriever = vectorStore.asRetriever();

const prompt = PromptTemplate.fromTemplate(EMBEDDINGS_PROMPT);

const ragChain = RunnableSequence.from([
Expand Down Expand Up @@ -190,6 +205,85 @@ export const GetLangchainProvider = (

return output;
},
createEventFromDocuments: async (
content,
question,
type,
prompt,
options,
) => {
const model =
options?.model ?? opts.models?.embeddings ?? "text-embedding-ada-002";
const chatModel = options?.model ?? opts.models?.chat ?? "gpt-4o";

const embeddings = new OpenAIEmbeddings({
model,
apiKey: opts.apiKey,
configuration: {
baseURL: opts.baseURL,
},
});

const chat = new ChatOpenAI({
model: chatModel,
temperature: 0,
apiKey: opts.apiKey,
timeout: 60 * 30,
maxRetries: 3,
configuration: {
baseURL: opts.baseURL,
},
streaming: true,
onFailedAttempt: (error) => {
langchainLogger.error.log("Failed attempt", error);
return error;
},
});

const textSplitter = new RecursiveCharacterTextSplitter({
chunkSize: 2000,
chunkOverlap: 100,
});
const splits = await textSplitter.splitDocuments(content);

const vectorStore = await MemoryVectorStore.fromDocuments(
splits,
embeddings,
);

// Retrieve and generate using the relevant snippets of the blog.
const retriever = vectorStore.asRetriever({ verbose: true });

const formatInstructions = `Respond with a valid JSON object, containing the fields: "title" and "date", considering the content describes an event of type "${type}".`;
const promptTemplate = await PromptTemplate.fromTemplate(prompt).partial({
format_instructions: formatInstructions,
});

// Set up a parser + inject instructions into the prompt template.
const parser = new JsonOutputParser<CreateEventBody>();

const ragChain = RunnableSequence.from([
{
context: retriever.pipe(formatDocumentsAsString),
question: new RunnablePassthrough(),
},
promptTemplate.pipe(chat).pipe(parser),
]);

const stream = await ragChain.stream({
question,
});

let output: any;
for await (const chunk of stream) {
langchainLogger.debug.log("chunk", chunk);
output.push(chunk);
}

langchainLogger.info.log("output", output);

return JSON.stringify(output);
},
};
};

Expand Down
41 changes: 41 additions & 0 deletions services/ai-bot/src/flows/ai/createEventFromText.flow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { createEventFromText } from "@liexp/backend/lib/flows/ai/createEventFromText.flow.js";
import { LoggerService } from "@liexp/backend/lib/services/logger/logger.service.js";
import { fp, pipe } from "@liexp/core/lib/fp/index.js";
import { type CreateEventFromTextQueueData } from "@liexp/shared/lib/io/http/Queue/CreateEventFromTextQueueData.js";
import { toAIBotError } from "../../common/error/index.js";
import { type ClientContext } from "../../context.js";
import { loadDocs } from "./common/loadDocs.flow.js";
import { getPromptFromResource } from "./prompts.js";
import { type JobProcessRTE } from "#services/job-processor/job-processor.service.js";

export const createEventFromTextFlow: JobProcessRTE<
CreateEventFromTextQueueData
> = (job) => {
return pipe(
fp.RTE.Do,
fp.RTE.bind("docs", () => loadDocs(job)),
fp.RTE.bind(
"jsonSchema",
() => (ctx) =>
pipe(
ctx.endpointsRESTClient.Endpoints.Event.getList({
filter: { eventType: [job.data.type] },
sort: { field: "updatedAt", order: "DESC" },
}),
fp.TE.map((events) => events.data[0]),
fp.TE.mapLeft(toAIBotError),
),
),
fp.RTE.chain(({ docs, jsonSchema }) =>
createEventFromText<ClientContext>(
docs,
job.data.text,
job.data.type,
job.prompt ?? getPromptFromResource(job.resource, job.type),
JSON.stringify(jsonSchema),
),
),
fp.RTE.map((event) => JSON.stringify(event)),
LoggerService.RTE.debug("`createEventFromTextFlow` result: %O"),
);
};
33 changes: 33 additions & 0 deletions services/ai-bot/src/flows/ai/createEventFromURL.flow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { type AvailableModels } from "@liexp/backend/lib/providers/ai/langchain.provider.js";
import { LoggerService } from "@liexp/backend/lib/services/logger/logger.service.js";
import { fp, pipe } from "@liexp/core/lib/fp/index.js";
import { type CreateEventFromURLQueueData } from "@liexp/shared/lib/io/http/Queue/CreateEventFromURLQueue.js";
import { toAIBotError } from "../../common/error/index.js";
import { loadDocs } from "./common/loadDocs.flow.js";
import { getPromptFromResource } from "./prompts.js";
import { type JobProcessRTE } from "#services/job-processor/job-processor.service.js";

const defaultQuestion = "";

export const createEventFromURLFlow: JobProcessRTE<
CreateEventFromURLQueueData
> = (job) => (ctx) => {
return pipe(
loadDocs(job)(ctx),
fp.TE.chain((docs) =>
fp.TE.tryCatch(() => {
return ctx.langchain.createEventFromDocuments(
docs,
job.question ?? defaultQuestion,
job.data.type,
job.prompt ?? getPromptFromResource(job.resource, job.type),
{
model: ctx.config.config.localAi.models
?.embeddings as AvailableModels,
},
);
}, toAIBotError),
),
LoggerService.TE.debug(ctx, "`createEventFlow` result: %O"),
);
};
11 changes: 4 additions & 7 deletions services/ai-bot/src/flows/ai/jobProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import { fp } from "@liexp/core/lib/fp/index.js";
import { OpenAICreateEventFromTextType } from "@liexp/shared/lib/io/http/Queue/CreateEventFromTextQueueData.js";
import { OpenAICreateEventFromURLType } from "@liexp/shared/lib/io/http/Queue/CreateEventFromURLQueue.js";
import {
OpenAIEmbeddingQueueType,
OpenAISummarizeQueueType,
} from "@liexp/shared/lib/io/http/Queue/index.js";
import { type Queue } from "@liexp/shared/lib/io/http/index.js";
import { type ClientContextRTE } from "../../types.js";
import { createEventFromTextFlow } from "./createEventFromText.flow.js";
import { createEventFromURLFlow } from "./createEventFromURL.flow.js";
import { embedAndQuestionFlow } from "./embedAndQuestion.js";
import { summarizeTextFlow } from "./summarizeTexFlow.js";
import { GetJobProcessor } from "#services/job-processor/job-processor.service.js";

export const JobProcessor = GetJobProcessor({
[OpenAISummarizeQueueType.value]: summarizeTextFlow,
[OpenAIEmbeddingQueueType.value]: embedAndQuestionFlow,
[OpenAICreateEventFromURLType.value]: (
job: Queue.Queue,
): ClientContextRTE<string> => fp.RTE.right(""),
[OpenAICreateEventFromTextType.value]: (job: Queue.Queue) => fp.RTE.right(""),
[OpenAICreateEventFromURLType.value]: createEventFromURLFlow,
[OpenAICreateEventFromTextType.value]: createEventFromTextFlow,
});
1 change: 1 addition & 0 deletions services/api/test/vitest.config.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const config = extendBaseConfig(import.meta.url, (toAlias) => ({
globalSetup: [toAlias(`globalSetup.ts`)],
exclude: ["**/build", "**/src/migrations", "**/src/scripts"],
pool: "forks",
bail: 1,
poolOptions: {
forks: {
singleFork: process.env.CI === "true" ? true : false,
Expand Down
37 changes: 37 additions & 0 deletions services/worker/src/jobs/processOpenAIJobsDone.job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import {
} from "@liexp/backend/lib/services/entity-repository.service.js";
import { LoggerService } from "@liexp/backend/lib/services/logger/logger.service.js";
import { fp, pipe } from "@liexp/core/lib/fp/index.js";
import { DecodeError } from "@liexp/shared/lib/io/http/Error/DecodeError.js";
import { Event } from "@liexp/shared/lib/io/http/Events/index.js";
import { OpenAICreateEventFromTextType } from "@liexp/shared/lib/io/http/Queue/CreateEventFromTextQueueData.js";
import { type Queue } from "@liexp/shared/lib/io/http/index.js";
import { Equal, type FindOptionsWhere } from "typeorm";
import { type RTE } from "../types.js";
Expand Down Expand Up @@ -60,6 +63,33 @@ const processDoneJobBlockNoteResult =
);
};

const processDoneJobEventResult =
(dbService: EntityRepository<EventV2Entity>) =>
(job: Queue.Queue): RTE<Queue.Queue> => {
return pipe(
fp.RTE.Do,
fp.RTE.bind("event", () => {
return pipe(
fp.RTE.of(job.result),
fp.RTE.chainEitherK(Event.decode),
fp.RTE.mapLeft((errs) => DecodeError.of("Event", errs)),
);
}),
fp.RTE.chain(({ event }) =>
dbService.save([
{
...event,
links: event.links.map((l) => ({ id: l })),
media: event.media.map((m) => ({ id: m })),
keywords: event.keywords.map((k) => ({ id: k })),
socialPosts: event.socialPosts?.map((s) => ({ id: s })),
},
]),
),
fp.RTE.map(() => job),
);
};

export const processDoneJob = (job: Queue.Queue): RTE<Queue.Queue> => {
return pipe(
fp.RTE.right(job),
Expand Down Expand Up @@ -105,6 +135,13 @@ export const processDoneJob = (job: Queue.Queue): RTE<Queue.Queue> => {
}

if (job.resource === "events") {
if (OpenAICreateEventFromTextType.is(job.type)) {
return pipe(
processDoneJobEventResult(EventRepository)(job),
fp.RTE.map(() => job),
);
}

return pipe(
processDoneJobBlockNoteResult(EventRepository)(job),
fp.RTE.map(() => job),
Expand Down
Loading