1. BullMQ คืออะไร
BullMQ เป็น high-performance Node.js library สำหรับจัดการ Job Queues ที่ใช้ Redis เป็น message broker พัฒนาต่อจาก Bull library โดยทีมเดียวกัน แต่เขียนใหม่ทั้งหมดด้วย TypeScript
BullMQ เป็น solution ที่นิยมใช้สำหรับ Background Job Processing ใน Production environments เพราะมี features ครบครันและมีความเสถียรสูง
Key Point: BullMQ ใช้ Redis เป็นหลักในการจัดเก็บ jobs ทำให้สามารถ scale ได้ง่ายและรองรับ distributed processing
2. ทำไมต้องใช้ Job Queue
เมื่อ application ต้องทำงานที่ใช้เวลานาน เช่น ส่ง emails, ประมวลผลไฟล์, หรือเรียก external APIs การทำงานแบบ synchronous จะทำให้ user ต้องรอนาน
ปัญหาที่พบบ่อย:
- HTTP Request timeout เมื่อประมวลผลนานเกินไป
- User experience แย่เพราะต้องรอ response
- Server resource ถูกใช้จนหมด
- ไม่สามารถ retry เมื่อเกิด error
Job Queue ช่วยแก้ปัญหาเหล่านี้:
- Asynchronous Processing - ทำงานเบื้องหลัง ไม่ block request
- Automatic Retries - retry อัตโนมัติเมื่อ job ล้มเหลว
- Rate Limiting - ควบคุมปริมาณงานที่ประมวลผลพร้อมกัน
- Priority - จัดลำดับความสำคัญของ jobs
- Scalability - เพิ่ม workers ได้ตามต้องการ
3. การติดตั้ง
1 ติดตั้ง Redis
BullMQ ต้องการ Redis version 5.0 ขึ้นไป:
# ติดตั้ง Redis
sudo apt update
sudo apt install redis-server -y
# เริ่ม Redis service
sudo systemctl start redis-server
sudo systemctl enable redis-server
# ตรวจสอบ Redis status
redis-cli ping
# ควรได้ response: PONG
# รัน Redis ด้วย Docker
docker run -d --name redis \
-p 6379:6379 \
redis:7-alpine
# ตรวจสอบว่า Redis ทำงาน
docker ps | grep redis
2 ติดตั้ง BullMQ
# สร้างโปรเจค
mkdir bullmq-demo && cd bullmq-demo
npm init -y
# ติดตั้ง BullMQ
npm install bullmq
# ติดตั้ง ioredis ( Redis client)
npm install ioredis
4. BullMQ Architecture
Producer
Application ที่สร้างและเพิ่ม jobs เข้า queue
Redis
จัดเก็บ jobs และจัดการ queue state
Workers
Process jobs จาก queue ทีละ job
5. การใช้งานพื้นฐาน
สร้าง Queue และเพิ่ม Jobs
const { Queue } = require('bullmq');
// สร้าง Queue
const emailQueue = new Queue('email-queue', {
connection: {
host: 'localhost',
port: 6379,
},
});
// เพิ่ม Job เบื้องต้น
async function sendEmail() {
await emailQueue.add('send-welcome', {
to: 'user@example.com',
subject: 'Welcome!',
body: 'Welcome to our service!',
});
console.log('Job added to queue!');
}
// เพิ่ม Job พร้อม options
async function sendEmailWithOptions() {
await emailQueue.add('send-notification', {
to: 'user@example.com',
template: 'notification',
}, {
// ดีเลย์ 5 วินาที
delay: 5000,
// Priority สูง (ค่าน้อย = priority สูง)
priority: 1,
// จำกัดจำนวน retries
attempts: 3,
// Backoff strategy
backoff: {
type: 'exponential',
delay: 1000,
},
// Job timeout
timeout: 30000,
});
}
sendEmail();
สร้าง Worker สำหรับประมวลผล Jobs
const { Worker } = require('bullmq');
// สร้าง Worker
const worker = new Worker(
'email-queue',
async (job) => {
console.log(`Processing job ${job.id} - ${job.name}`);
console.log('Job data:', job.data);
// จำลองการส่ง email
const { to, subject, body } = job.data;
// อัพเดท progress
await job.updateProgress(50);
// ส่ง email (ตัวอย่าง)
await sendEmailToUser(to, subject, body);
// อัพเดท progress เป็น 100%
await job.updateProgress(100);
// Return ผลลัพธ์
return { success: true, sentAt: new Date() };
},
{
connection: {
host: 'localhost',
port: 6379,
},
// จำนวน concurrent jobs
concurrency: 5,
}
);
// Event listeners
worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});
worker.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err.message);
});
worker.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
// Mock function
async function sendEmailToUser(to, subject, body) {
console.log(`Sending email to ${to}...`);
await new Promise(resolve => setTimeout(resolve, 1000));
console.log('Email sent!');
}
TypeScript Example
import { Queue, Worker, Job } from 'bullmq';
// Define job data type
interface EmailJobData {
to: string;
subject: string;
body: string;
template?: string;
}
// Define job return type
interface EmailJobResult {
success: boolean;
messageId: string;
sentAt: Date;
}
// Create typed queue
const emailQueue = new Queue('email-queue');
// Create typed worker
const worker = new Worker(
'email-queue',
async (job: Job) => {
const { to, subject, body } = job.data;
// Type-safe job processing
const messageId = `msg_${Date.now()}`;
// Business logic here...
return {
success: true,
messageId,
sentAt: new Date(),
};
}
);
6. คุณสมบัติขั้นสูง
Delayed Jobs
สร้าง jobs ที่จะทำงานในเวลาที่กำหนด:
// ดีเลย์ 1 นาที
await emailQueue.add('reminder', {
userId: 123,
message: 'Please complete your profile',
}, {
delay: 60000, // 60 seconds
});
// กำหนดเวลาที่แน่นอน
const scheduleTime = new Date('2026-03-01T09:00:00Z');
await emailQueue.add('scheduled-email', {
to: 'user@example.com',
subject: 'Monthly Report',
}, {
delay: scheduleTime.getTime() - Date.now(),
});
Repeatable Jobs (Cron)
สร้าง jobs ที่ทำงานซ้ำตามกำหนดเวลา:
// ทุก ๆ 10 นาที
await emailQueue.add('cleanup', {}, {
repeat: {
every: 600000, // 10 minutes in ms
},
});
// ใช้ Cron expression - ทุกวันเวลา 09:00
await emailQueue.add('daily-report', {
type: 'daily',
}, {
repeat: {
pattern: '0 9 * * *', // Every day at 9:00 AM
},
});
// ทุกวันจันทร์เวลา 10:00
await emailQueue.add('weekly-summary', {
type: 'weekly',
}, {
repeat: {
pattern: '0 10 * * 1', // Every Monday at 10:00 AM
},
});
// หยุด repeatable job
const repeatableJobs = await emailQueue.getRepeatableJobs();
for (const job of repeatableJobs) {
if (job.name === 'daily-report') {
await emailQueue.removeRepeatableByKey(job.key);
}
}
Priority Queues
จัดลำดับความสำคัญของ jobs (ค่าน้อย = priority สูง):
// High priority (ค่าน้อย)
await emailQueue.add('urgent-email', {
to: 'admin@example.com',
subject: 'Server Alert!',
}, { priority: 1 });
// Normal priority
await emailQueue.add('regular-email', {
to: 'user@example.com',
subject: 'Newsletter',
}, { priority: 10 });
// Low priority (ค่ามาก)
await emailQueue.add('bulk-email', {
to: 'subscribers@example.com',
subject: 'Monthly Update',
}, { priority: 100 });
Flows (Job Dependencies)
สร้าง chain หรือ tree ของ jobs ที่มี dependencies:
const { FlowProducer } = require('bullmq');
const flowProducer = new FlowProducer();
// สร้าง flow แบบ chain
await flowProducer.add({
name: 'process-order',
queueName: 'order-queue',
data: { orderId: 123 },
children: [
{
name: 'validate-payment',
queueName: 'payment-queue',
data: { orderId: 123 },
children: [
{
name: 'send-receipt',
queueName: 'email-queue',
data: { orderId: 123, email: 'user@example.com' },
},
],
},
{
name: 'update-inventory',
queueName: 'inventory-queue',
data: { orderId: 123 },
},
],
});
// Flow จะทำงานจากล่างขึ้นบน:
// 1. validate-payment -> send-receipt (child)
// 2. update-inventory (parallel)
// 3. process-order (parent - รอ children เสร็จทั้งหมด)
7. Error Handling & Retries
Retry Strategy
await emailQueue.add('send-email', {
to: 'user@example.com',
subject: 'Test',
}, {
// จำนวนครั้งที่ retry
attempts: 5,
// Backoff strategy
backoff: {
// 'fixed' หรือ 'exponential'
type: 'exponential',
// ดีเลย์เริ่มต้น (ms)
delay: 1000,
},
});
// ใน Worker - throw error เพื่อ trigger retry
const worker = new Worker('email-queue', async (job) => {
try {
// พยายามส่ง email
await sendEmail(job.data);
} catch (error) {
// Check ว่าเป็น error ที่ควร retry หรือไม่
if (error.code === 'RATE_LIMIT') {
// throw เพื่อ retry
throw error;
} else {
// ไม่ retry - ถือว่า job failed ถาวร
await job.moveToFailed(error, 'No retry for this error');
return;
}
}
});
Dead Letter Queue
จัดการ jobs ที่ fail ถาวร:
const { Queue, Worker } = require('bullmq');
// Main queue
const mainQueue = new Queue('main-queue');
// Dead Letter Queue
const dlq = new Queue('dead-letter-queue');
const worker = new Worker('main-queue', async (job) => {
try {
await processJob(job);
} catch (error) {
// ถ้า retry ครบแล้ว ย้ายไป DLQ
if (job.attemptsMade >= job.opts.attempts) {
await dlq.add('failed-job', {
originalJob: job.data,
error: error.message,
failedAt: new Date(),
attemptsMade: job.attemptsMade,
});
}
throw error;
}
}, {
// Maximum attempts
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
});
// Worker สำหรับ DLQ - ส่ง notification หรือ log
const dlqWorker = new Worker('dead-letter-queue', async (job) => {
console.error('Job failed permanently:', job.data);
// ส่ง alert ไปทีม dev
await sendAlert({
type: 'job_failed',
data: job.data,
});
});
8. Production Deployment Tips
Redis Configuration
// redis.conf
maxmemory 2gb
maxmemory-policy allkeys-lru
save "" # ปิด persistence ถ้าไม่จำเป็น
Worker Configuration
const worker = new Worker('my-queue', processor, {
connection: {
host: process.env.REDIS_HOST,
port: 6379,
maxRetriesPerRequest: null, // สำคัญ!
enableReadyCheck: false,
},
concurrency: 10, // จำนวน concurrent jobs
limiter: {
max: 100, // max jobs
duration: 1000, // per second
},
});
Graceful Shutdown
process.on('SIGTERM', async () => {
console.log('Shutting down worker...');
// หยุดรับ jobs ใหม่
await worker.close();
// รอ jobs ที่กำลังทำอยู่เสร็จ
// (worker.close() จะทำให้)
process.exit(0);
});
9. Best Practices
Keep Jobs Small
เก็บเฉพาะ references ใน job data ไม่เก็บไฟล์หรือข้อมูลขนาดใหญ่
Idempotent Jobs
ออกแบบให้ job สามารถรันซ้ำได้โดยไม่เกิด side effects
Secure Redis
ใช้ Redis ACLs และ TLS ใน production environment
Monitor Queue Health
ติดตาม waiting, active, completed, failed counts
Set Reasonable Timeouts
กำหนด timeout สำหรับแต่ละ job type
สรุป
BullMQ เป็น solution ที่ยอดเยี่ยมสำหรับ Node.js applications ที่ต้องการ background job processing โดยมีจุดเด่น:
- ใช้ Redis ทำให้ scale ได้ง่าย
- รองรับ TypeScript อย่างสมบูรณ์
- Features ครบครัน (retries, delays, priorities, flows)
- Active community และ documentation ดี
- Production-ready และ battle-tested