From 1c22c8afd5c49966ee0a12b47f47fdca10d67a78 Mon Sep 17 00:00:00 2001 From: Mahendra <22556323+mahendraHegde@users.noreply.github.com> Date: Sat, 18 Jan 2025 13:56:27 -0300 Subject: [PATCH] add email queue to handle email sending failures --- app/emails/email.worker.server.ts | 59 ++++++++ app/emails/mail.server.ts | 131 +++++------------- app/emails/types.ts | 20 +++ app/entry.server.tsx | 10 ++ app/modules/booking/constants.ts | 3 - app/modules/booking/email-helpers.ts | 59 ++++---- app/modules/booking/service.server.ts | 12 +- app/modules/booking/worker.server.ts | 77 ++++------ app/modules/invite/service.server.ts | 2 +- app/modules/user/utils.server.ts | 2 +- .../_layout+/account-details.general.tsx | 2 +- app/routes/_welcome+/onboarding.tsx | 2 +- app/routes/api+/stripe-webhook.ts | 2 +- app/utils/scheduler.server.ts | 5 + 14 files changed, 193 insertions(+), 193 deletions(-) create mode 100644 app/emails/email.worker.server.ts diff --git a/app/emails/email.worker.server.ts b/app/emails/email.worker.server.ts new file mode 100644 index 000000000..c37837c2a --- /dev/null +++ b/app/emails/email.worker.server.ts @@ -0,0 +1,59 @@ +import { transporter } from "~/emails/transporter.server"; +import { QueueNames, scheduler } from "~/utils/scheduler.server"; +import type { EmailPayloadType } from "./types"; +import { SMTP_FROM } from "../utils/env"; +import { ShelfError } from "../utils/error"; + +export const registerEmailWorkers = async () => { + await scheduler.work( + QueueNames.emailQueue, + { newJobCheckIntervalSeconds: 60 * 3, teamSize: 2 }, + async (job) => { + await triggerEmail(job.data); + } + ); +}; + +export const triggerEmail = async ({ + to, + subject, + text, + html, + from, + replyTo, +}: EmailPayloadType) => { + try { + // send mail with defined transport object + await transporter.sendMail({ + from: from || SMTP_FROM || `"Shelf" `, // sender address + replyTo: replyTo || "support@shelf.nu", // reply to + to, // list of receivers + subject, // Subject line + text, // plain text body + html: html || "", // html body + }); + } catch (cause) { + throw new ShelfError({ + cause, + message: "Unable to send email", + additionalData: { to, subject, from }, + label: "Email", + }); + } + + // verify connection configuration + // transporter.verify(function (error) { + // if (error) { + // // eslint-disable-next-line no-console + // console.log(error); + // } else { + // // eslint-disable-next-line no-console + // console.log("Server is ready to take our messages"); + // } + // }); + + // Message sent: + + // Preview only available when sending through an Ethereal account + // console.log("Preview URL: %s", nodemailer.getTestMessageUrl(info)); +}; diff --git a/app/emails/mail.server.ts b/app/emails/mail.server.ts index 8b227b9ef..dd2511252 100644 --- a/app/emails/mail.server.ts +++ b/app/emails/mail.server.ts @@ -1,104 +1,39 @@ -import type { Attachment } from "nodemailer/lib/mailer"; -import { transporter } from "~/emails/transporter.server"; -import { SMTP_FROM } from "../utils/env"; -import { ShelfError } from "../utils/error"; - -export const sendEmail = async ({ - to, - subject, - text, - html, - attachments, - from, - replyTo, -}: { - /** Email address of recipient */ - to: string; - - /** Subject of email */ - subject: string; - - /** Text content of email */ - text: string; - - /** HTML content of email */ - html?: string; - - attachments?: Attachment[]; - - /** Override the default sender */ - from?: string; +import { Logger } from "~/utils/logger"; +import { QueueNames, scheduler } from "~/utils/scheduler.server"; +import { triggerEmail } from "./email.worker.server"; +import type { EmailPayloadType } from "./types"; + +export const sendEmail = (payload: EmailPayloadType) => { + // attempt to send email, push to the queue if it fails + triggerEmail(payload).catch((err) => { + Logger.warn({ + err, + details: { + to: payload.to, + subject: payload.subject, + from: payload.from, + }, + message: "email sending failed, pushing to the queue", + }); + void addToQueue(payload); + }); +}; - /** Override the default reply to email address */ - replyTo?: string; -}) => { +const addToQueue = async (payload: EmailPayloadType) => { try { - // send mail with defined transport object - await transporter.sendMail({ - from: from || SMTP_FROM || `"Shelf" `, // sender address - replyTo: replyTo || "support@shelf.nu", // reply to - to, // list of receivers - subject, // Subject line - text, // plain text body - html: html || "", // html body - attachments: [...(attachments || [])], + await scheduler.send(QueueNames.emailQueue, payload, { + retryLimit: 5, + retryDelay: 5, }); - } catch (cause) { - throw new ShelfError({ - cause, - message: "Unable to send email", - additionalData: { to, subject, from }, - label: "Email", + } catch (err) { + Logger.warn({ + err, + details: { + to: payload.to, + subject: payload.subject, + from: payload.from, + }, + message: "Failed to push email payload to queue", }); } - - // verify connection configuration - // transporter.verify(function (error) { - // if (error) { - // // eslint-disable-next-line no-console - // console.log(error); - // } else { - // // eslint-disable-next-line no-console - // console.log("Server is ready to take our messages"); - // } - // }); - - // Message sent: - - // Preview only available when sending through an Ethereal account - // console.log("Preview URL: %s", nodemailer.getTestMessageUrl(info)); }; - -/** Utility function to add delay between operations */ -async function delay(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - -/** Process emails in batches with rate limiting - * @param emails - Array of email configurations to send - * @param batchSize - Number of emails to process per batch (default: 2) - * @param delayMs - Milliseconds to wait between batches (default: 1000ms) - */ -export async function sendEmailsWithRateLimit( - emails: Array<{ - to: string; - subject: string; - text: string; - html: string; - }>, - batchSize = 2, - delayMs = 1100 -): Promise { - for (let i = 0; i < emails.length; i += batchSize) { - // Process emails in batches of specified size - const batch = emails.slice(i, i + batchSize); - - // Send emails in current batch concurrently - await Promise.all(batch.map((email) => sendEmail(email))); - - // If there are more emails to process, add delay before next batch - if (i + batchSize < emails.length) { - await delay(delayMs); - } - } -} diff --git a/app/emails/types.ts b/app/emails/types.ts index 933e98f0a..788414ab9 100644 --- a/app/emails/types.ts +++ b/app/emails/types.ts @@ -16,3 +16,23 @@ export type BookingForEmail = Prisma.BookingGetPayload<{ }; }; }>; + +export type EmailPayloadType = { + /** Email address of recipient */ + to: string; + + /** Subject of email */ + subject: string; + + /** Text content of email */ + text: string; + + /** HTML content of email */ + html?: string; + + /** Override the default sender */ + from?: string; + + /** Override the default reply to email address */ + replyTo?: string; +}; diff --git a/app/entry.server.tsx b/app/entry.server.tsx index 7c0dd4e86..4e5b375e8 100644 --- a/app/entry.server.tsx +++ b/app/entry.server.tsx @@ -7,6 +7,7 @@ import { RemixServer } from "@remix-run/react"; import * as Sentry from "@sentry/remix"; import { isbot } from "isbot"; import { renderToPipeableStream } from "react-dom/server"; +import { registerEmailWorkers } from "./emails/email.worker.server"; import { registerBookingWorkers } from "./modules/booking/worker.server"; import { ShelfError } from "./utils/error"; import { Logger } from "./utils/logger"; @@ -26,6 +27,15 @@ schedulerService }) ); }); + await registerEmailWorkers().catch((cause) => { + Logger.error( + new ShelfError({ + cause, + message: "Something went wrong while registering email workers.", + label: "Scheduler", + }) + ); + }); }) .finally(() => { // eslint-disable-next-line no-console diff --git a/app/modules/booking/constants.ts b/app/modules/booking/constants.ts index 81315b9b0..d0daa383e 100644 --- a/app/modules/booking/constants.ts +++ b/app/modules/booking/constants.ts @@ -1,6 +1,3 @@ -export const schedulerKeys = { - bookingQueue: "booking-queue", -}; export enum bookingSchedulerEventsEnum { checkoutReminder = `booking-checkout-reminder`, checkinReminder = `booking-checkin-reminder`, diff --git a/app/modules/booking/email-helpers.ts b/app/modules/booking/email-helpers.ts index 6a6368f5f..030a6927b 100644 --- a/app/modules/booking/email-helpers.ts +++ b/app/modules/booking/email-helpers.ts @@ -156,44 +156,35 @@ export const checkinReminderEmailContent = ({ )}.`, }); -export async function sendCheckinReminder( +export function sendCheckinReminder( booking: BookingForEmail, assetCount: number, hints: ClientHint ) { - try { - await sendEmail({ - to: booking.custodianUser!.email, - subject: `Checkin reminder (${booking.name}) - shelf.nu`, - text: checkinReminderEmailContent({ - hints, - bookingName: booking.name, - assetsCount: assetCount, - custodian: - `${booking.custodianUser!.firstName} ${booking.custodianUser - ?.lastName}` || (booking.custodianTeamMember?.name as string), - from: booking.from!, - to: booking.to!, - bookingId: booking.id, - }), - html: bookingUpdatesTemplateString({ - booking, - heading: `Your booking is due for checkin in ${getTimeRemainingMessage( - new Date(booking.to!), - new Date() - )}.`, - assetCount, - hints, - }), - }); - } catch (cause) { - throw new ShelfError({ - cause, - message: "Something went wrong while sending the checkin reminder email", - additionalData: { booking }, - label: "Booking", - }); - } + sendEmail({ + to: booking.custodianUser!.email, + subject: `Checkin reminder (${booking.name}) - shelf.nu`, + text: checkinReminderEmailContent({ + hints, + bookingName: booking.name, + assetsCount: assetCount, + custodian: + `${booking.custodianUser!.firstName} ${booking.custodianUser + ?.lastName}` || (booking.custodianTeamMember?.name as string), + from: booking.from!, + to: booking.to!, + bookingId: booking.id, + }), + html: bookingUpdatesTemplateString({ + booking, + heading: `Your booking is due for checkin in ${getTimeRemainingMessage( + new Date(booking.to!), + new Date() + )}.`, + assetCount, + hints, + }), + }); } /** diff --git a/app/modules/booking/service.server.ts b/app/modules/booking/service.server.ts index 0bc390151..22abf96bb 100644 --- a/app/modules/booking/service.server.ts +++ b/app/modules/booking/service.server.ts @@ -10,7 +10,7 @@ import type { } from "@prisma/client"; import { db } from "~/database/db.server"; import { bookingUpdatesTemplateString } from "~/emails/bookings-updates-template"; -import { sendEmail, sendEmailsWithRateLimit } from "~/emails/mail.server"; +import { sendEmail } from "~/emails/mail.server"; import { getStatusClasses, isOneDayEvent } from "~/utils/calendar"; import { getDateTimeFormat } from "~/utils/client-hints"; import { calcTimeDifference } from "~/utils/date-fns"; @@ -21,9 +21,9 @@ import { getRedirectUrlFromRequest } from "~/utils/http"; import { getCurrentSearchParams } from "~/utils/http.server"; import { ALL_SELECTED_KEY } from "~/utils/list"; import { Logger } from "~/utils/logger"; -import { scheduler } from "~/utils/scheduler.server"; +import { QueueNames, scheduler } from "~/utils/scheduler.server"; import type { MergeInclude } from "~/utils/utils"; -import { bookingSchedulerEventsEnum, schedulerKeys } from "./constants"; +import { bookingSchedulerEventsEnum } from "./constants"; import { assetReservedEmailContent, cancelledBookingEmailContent, @@ -82,7 +82,7 @@ export async function scheduleNextBookingJob({ }) { try { const id = await scheduler.sendAfter( - schedulerKeys.bookingQueue, + QueueNames.bookingQueue, data, {}, when @@ -843,7 +843,7 @@ export async function deleteBooking( hideViewButton: true, }); - await sendEmail({ + sendEmail({ to: email, subject, text, @@ -1341,7 +1341,7 @@ export async function bulkDeleteBookings({ })); // Send emails with rate limiting - return await sendEmailsWithRateLimit(emailConfigs); + return emailConfigs.map(sendEmail); } catch (cause) { const message = cause instanceof ShelfError diff --git a/app/modules/booking/worker.server.ts b/app/modules/booking/worker.server.ts index 5d76e7a16..fe394253b 100644 --- a/app/modules/booking/worker.server.ts +++ b/app/modules/booking/worker.server.ts @@ -7,8 +7,8 @@ import { sendEmail } from "~/emails/mail.server"; import { getTimeRemainingMessage } from "~/utils/date-fns"; import { ShelfError } from "~/utils/error"; import { Logger } from "~/utils/logger"; -import { scheduler } from "~/utils/scheduler.server"; -import { bookingSchedulerEventsEnum, schedulerKeys } from "./constants"; +import { QueueNames, scheduler } from "~/utils/scheduler.server"; +import { bookingSchedulerEventsEnum } from "./constants"; import { checkoutReminderEmailContent, overdueBookingEmailContent, @@ -38,7 +38,7 @@ const checkoutReminder = async ({ data }: PgBoss.Job) => { const email = booking.custodianUser?.email; if (email && booking.from && booking.to) { - await sendEmail({ + sendEmail({ to: email, subject: `🔔 Checkout reminder (${booking.name}) - shelf.nu`, text: checkoutReminderEmailContent({ @@ -61,16 +61,6 @@ const checkoutReminder = async ({ data }: PgBoss.Job) => { assetCount: booking._count.assets, hints: data.hints, }), - }).catch((cause) => { - //lets not fail the process because of email failure - Logger.warn( - new ShelfError({ - cause, - message: "Failed to send checkout reminder email", - additionalData: { data, work: data.eventType }, - label: "Booking", - }) - ); }); } @@ -114,11 +104,7 @@ const checkinReminder = async ({ data }: PgBoss.Job) => { booking.to && booking.status === BookingStatus.ONGOING ) { - await sendCheckinReminder(booking, booking._count.assets, data.hints).catch( - (err) => { - Logger.warn(err); - } - ); + sendCheckinReminder(booking, booking._count.assets, data.hints) } //schedule the next job @@ -185,7 +171,7 @@ const overdueReminder = async ({ data }: PgBoss.Job) => { const email = booking.custodianUser?.email; if (email) { - await sendEmail({ + sendEmail({ to: email, subject: `Overdue booking (${booking.name}) - shelf.nu`, text: overdueBookingEmailContent({ @@ -222,33 +208,30 @@ const event2HandlerMap: Record< /** ===== start: listens and creates chain of jobs for a given booking ===== */ export const registerBookingWorkers = async () => { /** Check-out reminder */ - await scheduler.work( - schedulerKeys.bookingQueue, - async (job) => { - const handler = event2HandlerMap[job.data.eventType]; - if (typeof handler != "function") { - Logger.error( - new ShelfError({ - cause: null, - message: "Wrong event type received for the scheduled worker", - additionalData: { job }, - label: "Booking", - }) - ); - return; - } - try { - await handler(job); - } catch (cause) { - Logger.error( - new ShelfError({ - cause, - message: "Something went wrong while executing scheduled work.", - additionalData: { data: job.data, work: job.data.eventType }, - label: "Booking", - }) - ); - } + await scheduler.work(QueueNames.bookingQueue, async (job) => { + const handler = event2HandlerMap[job.data.eventType]; + if (typeof handler != "function") { + Logger.error( + new ShelfError({ + cause: null, + message: "Wrong event type received for the scheduled worker", + additionalData: { job }, + label: "Booking", + }) + ); + return; + } + try { + await handler(job); + } catch (cause) { + Logger.error( + new ShelfError({ + cause, + message: "Something went wrong while executing scheduled work.", + additionalData: { data: job.data, work: job.data.eventType }, + label: "Booking", + }) + ); } - ); + }); }; diff --git a/app/modules/invite/service.server.ts b/app/modules/invite/service.server.ts index cbfa65a1e..8ecd5bfde 100644 --- a/app/modules/invite/service.server.ts +++ b/app/modules/invite/service.server.ts @@ -250,7 +250,7 @@ export async function createInvite( expiresIn: `${INVITE_EXPIRY_TTL_DAYS}d`, }); //keep only needed data in token to maintain the size - await sendEmail({ + sendEmail({ to: inviteeEmail, subject: `✉️ You have been invited to ${invite.organization.name}`, text: inviteEmailText({ invite, token }), diff --git a/app/modules/user/utils.server.ts b/app/modules/user/utils.server.ts index ae849efe4..7de199b63 100644 --- a/app/modules/user/utils.server.ts +++ b/app/modules/user/utils.server.ts @@ -108,7 +108,7 @@ export async function resolveUserAction( }); }); - await sendEmail({ + sendEmail({ to: user.email, subject: `Access to ${org.name} has been revoked`, text: revokeAccessEmailText({ orgName: org.name }), diff --git a/app/routes/_layout+/account-details.general.tsx b/app/routes/_layout+/account-details.general.tsx index a88671225..e97ddb1ec 100644 --- a/app/routes/_layout+/account-details.general.tsx +++ b/app/routes/_layout+/account-details.general.tsx @@ -244,7 +244,7 @@ export async function action({ context, request }: ActionFunctionArgs) { } // Send email with OTP using our email service - await sendEmail({ + sendEmail({ to: newEmail, subject: `🔐 Shelf verification code: ${linkData.properties.email_otp}`, text: changeEmailAddressTextEmail({ diff --git a/app/routes/_welcome+/onboarding.tsx b/app/routes/_welcome+/onboarding.tsx index 373bd2f60..cd5c9cf00 100644 --- a/app/routes/_welcome+/onboarding.tsx +++ b/app/routes/_welcome+/onboarding.tsx @@ -167,7 +167,7 @@ export async function action({ context, request }: ActionFunctionArgs) { if (config.sendOnboardingEmail) { /** Send onboarding email */ - await sendEmail({ + sendEmail({ from: SMTP_FROM || `"Carlos from shelf.nu" `, replyTo: "carlos@shelf.nu", to: user.email, diff --git a/app/routes/api+/stripe-webhook.ts b/app/routes/api+/stripe-webhook.ts index 2aed4f9b7..ef189078a 100644 --- a/app/routes/api+/stripe-webhook.ts +++ b/app/routes/api+/stripe-webhook.ts @@ -285,7 +285,7 @@ export async function action({ request }: ActionFunctionArgs) { }); }); - await sendEmail({ + sendEmail({ to: user.email, subject: "Your shelf.nu free trial is ending soon", text: trialEndsSoonText({ diff --git a/app/utils/scheduler.server.ts b/app/utils/scheduler.server.ts index ca819d258..4498c30e7 100644 --- a/app/utils/scheduler.server.ts +++ b/app/utils/scheduler.server.ts @@ -1,6 +1,11 @@ import PgBoss from "pg-boss"; import { DATABASE_URL, NODE_ENV } from "../utils/env"; +export enum QueueNames { + emailQueue = "email-queue", + bookingQueue = "booking-queue", +} + let scheduler!: PgBoss; declare global {