Loading...
Loading...
> Process time-consuming tasks asynchronously with job queues, workers, and the email worker system.
Background jobs allow you to offload time-consuming tasks from your API routes to dedicated workers. This improves response times and provides better reliability with automatic retries and error handling.
Job model in Prisma schema
1// prisma/schema.prisma2model Job {3 id String @id @default(cuid())4 queue String // "default", "email", "webhooks"5 type String // Job type identifier6 payload Json // Job data78 status String @default("pending") // pending, processing, completed, failed9 priority Int @default(0)1011 attempts Int @default(0)12 maxAttempts Int @default(3)1314 error String? // Last error message15 result Json? // Job result1617 runAt DateTime @default(now())18 startedAt DateTime?19 completedAt DateTime?2021 createdAt DateTime @default(now())2223 @@index([queue, status, runAt])24 @@index([status, runAt])25}
Core service for managing job queues
1// src/lib/jobs/queue.ts2import { prisma } from "@/lib/db";34interface JobOptions {5 queue?: string;6 priority?: number;7 delay?: number; // milliseconds8 maxAttempts?: number;9}1011export async function enqueueJob(12 type: string,13 payload: Record<string, any>,14 options: JobOptions = {}15) {16 const {17 queue = "default",18 priority = 0,19 delay = 0,20 maxAttempts = 3,21 } = options;2223 const runAt = new Date(Date.now() + delay);2425 const job = await prisma.job.create({26 data: {27 queue,28 type,29 payload,30 priority,31 maxAttempts,32 runAt,33 },34 });3536 return job;37}3839export async function getNextJob(queue: string = "default") {40 // Get the next pending job that's ready to run41 const job = await prisma.job.findFirst({42 where: {43 queue,44 status: "pending",45 runAt: { lte: new Date() },46 },47 orderBy: [48 { priority: "desc" },49 { createdAt: "asc" },50 ],51 });5253 if (!job) return null;5455 // Mark as processing56 return prisma.job.update({57 where: { id: job.id },58 data: {59 status: "processing",60 startedAt: new Date(),61 attempts: { increment: 1 },62 },63 });64}6566export async function completeJob(67 jobId: string,68 result?: Record<string, any>69) {70 return prisma.job.update({71 where: { id: jobId },72 data: {73 status: "completed",74 completedAt: new Date(),75 result,76 },77 });78}7980export async function failJob(81 jobId: string,82 error: string83) {84 const job = await prisma.job.findUnique({85 where: { id: jobId },86 });8788 if (!job) return;8990 // Check if we should retry91 if (job.attempts < job.maxAttempts) {92 // Exponential backoff: 1min, 2min, 4min...93 const delay = Math.pow(2, job.attempts) * 60 * 1000;9495 return prisma.job.update({96 where: { id: jobId },97 data: {98 status: "pending",99 runAt: new Date(Date.now() + delay),100 error,101 },102 });103 }104105 // Max retries exceeded106 return prisma.job.update({107 where: { id: jobId },108 data: {109 status: "failed",110 error,111 },112 });113}
Create workers to process jobs
1// scripts/worker.ts2import { getNextJob, completeJob, failJob } from "@/lib/jobs/queue";34// Job handlers5const handlers: Record<string, (payload: any) => Promise<any>> = {6 "send-welcome-email": async (payload) => {7 const { userId, email } = payload;8 await sendWelcomeEmail(email);9 return { sent: true };10 },1112 "process-payment": async (payload) => {13 const { paymentId } = payload;14 // Process payment logic...15 return { processed: true };16 },1718 "generate-report": async (payload) => {19 const { userId, reportType } = payload;20 // Generate report logic...21 return { reportUrl: "/reports/123.pdf" };22 },2324 "webhook-delivery": async (payload) => {25 const { webhookId, event, data } = payload;26 // Deliver webhook logic...27 return { delivered: true };28 },29};3031async function processJobs(queue: string = "default") {323334 while (true) {35 const job = await getNextJob(queue);3637 if (!job) {38 // No jobs, wait before polling again39 await sleep(1000);40 continue;41 }42434445 try {46 const handler = handlers[job.type];4748 if (!handler) {49 throw new Error(`Unknown job type: ${job.type}`);50 }5152 const result = await handler(job.payload);53 await completeJob(job.id, result);545556 } catch (_) {57 const message = error instanceof Error ? error.message : "Unknown error";58 await failJob(job.id, message);5960 console.error(`Failed job ${job.id}: ${message}`);61 }62 }63}6465function sleep(ms: number) {66 return new Promise((resolve) => setTimeout(resolve, ms));67}6869// Start worker70processJobs(process.env.QUEUE || "default");
Dedicated worker for processing email queue
1// scripts/email-worker.ts2import { Resend } from "resend";3import { getNextJob, completeJob, failJob } from "@/lib/jobs/queue";4import { WelcomeEmail } from "@/emails/welcome";5import { PasswordResetEmail } from "@/emails/password-reset";6import { InvoiceEmail } from "@/emails/invoice";78const resend = new Resend(process.env.RESEND_API_KEY);910const emailTemplates: Record<string, React.ComponentType<any>> = {11 welcome: WelcomeEmail,12 "password-reset": PasswordResetEmail,13 invoice: InvoiceEmail,14};1516async function processEmailQueue() {171819 while (true) {20 const job = await getNextJob("email");2122 if (!job) {23 await sleep(1000);24 continue;25 }2627 const { template, to, subject, props } = job.payload;2829 try {30 const EmailTemplate = emailTemplates[template];3132 if (!EmailTemplate) {33 throw new Error(`Unknown email template: ${template}`);34 }3536 const { data, error } = await resend.emails.send({37 from: process.env.EMAIL_FROM!,38 to,39 subject,40 react: EmailTemplate(props),41 });4243 if (error) {44 throw new Error(error.message);45 }4647 await completeJob(job.id, { emailId: data?.id });4849 } catch (_) {50 const message = error instanceof Error ? error.message : "Unknown error";51 await failJob(job.id, message);52 console.error(`Failed to send email: ${message}`);53 }54 }55}5657function sleep(ms: number) {58 return new Promise((resolve) => setTimeout(resolve, ms));59}6061processEmailQueue();
Queue jobs from your API routes
1// In API routes2import { enqueueJob } from "@/lib/jobs/queue";34// Queue an email5export async function POST(req: Request) {6 const { email, name } = await req.json();78 // Create user...9 const user = await prisma.user.create({ ... });1011 // Queue welcome email (processed by email worker)12 await enqueueJob(13 "send-email",14 {15 template: "welcome",16 to: email,17 subject: "Welcome to Fabrk!",18 props: { name },19 },20 { queue: "email" }21 );2223 return Response.json({ user });24}2526// Queue a report with delay27await enqueueJob(28 "generate-report",29 {30 userId: user.id,31 reportType: "monthly",32 },33 {34 queue: "default",35 delay: 5000, // Wait 5 seconds36 priority: 10, // Higher priority37 }38);3940// Queue webhook delivery41await enqueueJob(42 "webhook-delivery",43 {44 webhookId: webhook.id,45 event: "user.created",46 data: { userId: user.id },47 },48 {49 queue: "webhooks",50 maxAttempts: 5,51 }52);
Start workers for different queues
1$# Development (with auto-restart)2$npm run jobs:dev # Default queue worker3$npm run email:dev # Email queue worker45$# Production6$node scripts/worker.js7$QUEUE=email node scripts/worker.js8$QUEUE=webhooks node scripts/worker.js910$# Docker Compose example11$# docker-compose.yml12$services:13$ app:14$ build: .15$ command: npm start1617$ worker-default:18$ build: .19$ command: node scripts/worker.js20$ environment:21$ - QUEUE=default2223$ worker-email:24$ build: .25$ command: node scripts/worker.js26$ environment:27$ - QUEUE=email2829$ worker-webhooks:30$ build: .31$ command: node scripts/worker.js32$ environment:33$ - QUEUE=webhooks