Skip to content

Commit

Permalink
add email queue to handle email sending failures
Browse files Browse the repository at this point in the history
  • Loading branch information
mahendraHegde committed Jan 18, 2025
1 parent 8e3e845 commit 1c22c8a
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 193 deletions.
59 changes: 59 additions & 0 deletions app/emails/email.worker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { transporter } from "~/emails/transporter.server";
import { QueueNames, scheduler } from "~/utils/scheduler.server";
import type { EmailPayloadType } from "./types";
import { SMTP_FROM } from "../utils/env";
import { ShelfError } from "../utils/error";

export const registerEmailWorkers = async () => {
await scheduler.work<EmailPayloadType>(
QueueNames.emailQueue,
{ newJobCheckIntervalSeconds: 60 * 3, teamSize: 2 },
async (job) => {
await triggerEmail(job.data);
}
);
};

export const triggerEmail = async ({
to,
subject,
text,
html,
from,
replyTo,
}: EmailPayloadType) => {
try {
// send mail with defined transport object
await transporter.sendMail({
from: from || SMTP_FROM || `"Shelf" <[email protected]>`, // sender address
replyTo: replyTo || "[email protected]", // reply to
to, // list of receivers
subject, // Subject line
text, // plain text body
html: html || "", // html body
});
} catch (cause) {
throw new ShelfError({
cause,
message: "Unable to send email",
additionalData: { to, subject, from },
label: "Email",
});
}

// verify connection configuration
// transporter.verify(function (error) {
// if (error) {
// // eslint-disable-next-line no-console
// console.log(error);
// } else {
// // eslint-disable-next-line no-console
// console.log("Server is ready to take our messages");
// }
// });

// Message sent: <[email protected]>

// Preview only available when sending through an Ethereal account
// console.log("Preview URL: %s", nodemailer.getTestMessageUrl(info));
};
131 changes: 33 additions & 98 deletions app/emails/mail.server.ts
Original file line number Diff line number Diff line change
@@ -1,104 +1,39 @@
import type { Attachment } from "nodemailer/lib/mailer";
import { transporter } from "~/emails/transporter.server";
import { SMTP_FROM } from "../utils/env";
import { ShelfError } from "../utils/error";

export const sendEmail = async ({
to,
subject,
text,
html,
attachments,
from,
replyTo,
}: {
/** Email address of recipient */
to: string;

/** Subject of email */
subject: string;

/** Text content of email */
text: string;

/** HTML content of email */
html?: string;

attachments?: Attachment[];

/** Override the default sender */
from?: string;
import { Logger } from "~/utils/logger";
import { QueueNames, scheduler } from "~/utils/scheduler.server";
import { triggerEmail } from "./email.worker.server";
import type { EmailPayloadType } from "./types";

export const sendEmail = (payload: EmailPayloadType) => {
// attempt to send email, push to the queue if it fails
triggerEmail(payload).catch((err) => {
Logger.warn({
err,
details: {
to: payload.to,
subject: payload.subject,
from: payload.from,
},
message: "email sending failed, pushing to the queue",
});
void addToQueue(payload);
});
};

/** Override the default reply to email address */
replyTo?: string;
}) => {
const addToQueue = async (payload: EmailPayloadType) => {
try {
// send mail with defined transport object
await transporter.sendMail({
from: from || SMTP_FROM || `"Shelf" <[email protected]>`, // sender address
replyTo: replyTo || "[email protected]", // reply to
to, // list of receivers
subject, // Subject line
text, // plain text body
html: html || "", // html body
attachments: [...(attachments || [])],
await scheduler.send(QueueNames.emailQueue, payload, {
retryLimit: 5,
retryDelay: 5,
});
} catch (cause) {
throw new ShelfError({
cause,
message: "Unable to send email",
additionalData: { to, subject, from },
label: "Email",
} catch (err) {
Logger.warn({
err,
details: {
to: payload.to,
subject: payload.subject,
from: payload.from,
},
message: "Failed to push email payload to queue",
});
}

// verify connection configuration
// transporter.verify(function (error) {
// if (error) {
// // eslint-disable-next-line no-console
// console.log(error);
// } else {
// // eslint-disable-next-line no-console
// console.log("Server is ready to take our messages");
// }
// });

// Message sent: <[email protected]>

// Preview only available when sending through an Ethereal account
// console.log("Preview URL: %s", nodemailer.getTestMessageUrl(info));
};

/** Utility function to add delay between operations */
async function delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

/** Process emails in batches with rate limiting
* @param emails - Array of email configurations to send
* @param batchSize - Number of emails to process per batch (default: 2)
* @param delayMs - Milliseconds to wait between batches (default: 1000ms)
*/
export async function sendEmailsWithRateLimit(
emails: Array<{
to: string;
subject: string;
text: string;
html: string;
}>,
batchSize = 2,
delayMs = 1100
): Promise<void> {
for (let i = 0; i < emails.length; i += batchSize) {
// Process emails in batches of specified size
const batch = emails.slice(i, i + batchSize);

// Send emails in current batch concurrently
await Promise.all(batch.map((email) => sendEmail(email)));

// If there are more emails to process, add delay before next batch
if (i + batchSize < emails.length) {
await delay(delayMs);
}
}
}
20 changes: 20 additions & 0 deletions app/emails/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,23 @@ export type BookingForEmail = Prisma.BookingGetPayload<{
};
};
}>;

export type EmailPayloadType = {
/** Email address of recipient */
to: string;

/** Subject of email */
subject: string;

/** Text content of email */
text: string;

/** HTML content of email */
html?: string;

/** Override the default sender */
from?: string;

/** Override the default reply to email address */
replyTo?: string;
};
10 changes: 10 additions & 0 deletions app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { RemixServer } from "@remix-run/react";
import * as Sentry from "@sentry/remix";
import { isbot } from "isbot";
import { renderToPipeableStream } from "react-dom/server";
import { registerEmailWorkers } from "./emails/email.worker.server";
import { registerBookingWorkers } from "./modules/booking/worker.server";
import { ShelfError } from "./utils/error";
import { Logger } from "./utils/logger";
Expand All @@ -26,6 +27,15 @@ schedulerService
})
);
});
await registerEmailWorkers().catch((cause) => {
Logger.error(
new ShelfError({
cause,
message: "Something went wrong while registering email workers.",
label: "Scheduler",
})
);
});
})
.finally(() => {
// eslint-disable-next-line no-console
Expand Down
3 changes: 0 additions & 3 deletions app/modules/booking/constants.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
export const schedulerKeys = {
bookingQueue: "booking-queue",
};
export enum bookingSchedulerEventsEnum {
checkoutReminder = `booking-checkout-reminder`,
checkinReminder = `booking-checkin-reminder`,
Expand Down
59 changes: 25 additions & 34 deletions app/modules/booking/email-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,44 +156,35 @@ export const checkinReminderEmailContent = ({
)}.`,
});

export async function sendCheckinReminder(
export function sendCheckinReminder(
booking: BookingForEmail,
assetCount: number,
hints: ClientHint
) {
try {
await sendEmail({
to: booking.custodianUser!.email,
subject: `Checkin reminder (${booking.name}) - shelf.nu`,
text: checkinReminderEmailContent({
hints,
bookingName: booking.name,
assetsCount: assetCount,
custodian:
`${booking.custodianUser!.firstName} ${booking.custodianUser
?.lastName}` || (booking.custodianTeamMember?.name as string),
from: booking.from!,
to: booking.to!,
bookingId: booking.id,
}),
html: bookingUpdatesTemplateString({
booking,
heading: `Your booking is due for checkin in ${getTimeRemainingMessage(
new Date(booking.to!),
new Date()
)}.`,
assetCount,
hints,
}),
});
} catch (cause) {
throw new ShelfError({
cause,
message: "Something went wrong while sending the checkin reminder email",
additionalData: { booking },
label: "Booking",
});
}
sendEmail({
to: booking.custodianUser!.email,
subject: `Checkin reminder (${booking.name}) - shelf.nu`,
text: checkinReminderEmailContent({
hints,
bookingName: booking.name,
assetsCount: assetCount,
custodian:
`${booking.custodianUser!.firstName} ${booking.custodianUser
?.lastName}` || (booking.custodianTeamMember?.name as string),
from: booking.from!,
to: booking.to!,
bookingId: booking.id,
}),
html: bookingUpdatesTemplateString({
booking,
heading: `Your booking is due for checkin in ${getTimeRemainingMessage(
new Date(booking.to!),
new Date()
)}.`,
assetCount,
hints,
}),
});
}

/**
Expand Down
12 changes: 6 additions & 6 deletions app/modules/booking/service.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type {
} from "@prisma/client";
import { db } from "~/database/db.server";
import { bookingUpdatesTemplateString } from "~/emails/bookings-updates-template";
import { sendEmail, sendEmailsWithRateLimit } from "~/emails/mail.server";
import { sendEmail } from "~/emails/mail.server";
import { getStatusClasses, isOneDayEvent } from "~/utils/calendar";
import { getDateTimeFormat } from "~/utils/client-hints";
import { calcTimeDifference } from "~/utils/date-fns";
Expand All @@ -21,9 +21,9 @@ import { getRedirectUrlFromRequest } from "~/utils/http";
import { getCurrentSearchParams } from "~/utils/http.server";
import { ALL_SELECTED_KEY } from "~/utils/list";
import { Logger } from "~/utils/logger";
import { scheduler } from "~/utils/scheduler.server";
import { QueueNames, scheduler } from "~/utils/scheduler.server";
import type { MergeInclude } from "~/utils/utils";
import { bookingSchedulerEventsEnum, schedulerKeys } from "./constants";
import { bookingSchedulerEventsEnum } from "./constants";
import {
assetReservedEmailContent,
cancelledBookingEmailContent,
Expand Down Expand Up @@ -82,7 +82,7 @@ export async function scheduleNextBookingJob({
}) {
try {
const id = await scheduler.sendAfter(
schedulerKeys.bookingQueue,
QueueNames.bookingQueue,
data,
{},
when
Expand Down Expand Up @@ -843,7 +843,7 @@ export async function deleteBooking(
hideViewButton: true,
});

await sendEmail({
sendEmail({
to: email,
subject,
text,
Expand Down Expand Up @@ -1341,7 +1341,7 @@ export async function bulkDeleteBookings({
}));

// Send emails with rate limiting
return await sendEmailsWithRateLimit(emailConfigs);
return emailConfigs.map(sendEmail);
} catch (cause) {
const message =
cause instanceof ShelfError
Expand Down
Loading

0 comments on commit 1c22c8a

Please sign in to comment.