Feb 2026 • Node.js + Redis

BullMQ

Node.js Job Queue ที่ทรงพลังที่สุด สร้าง Background Jobs ด้วย Redis

รองรับ Retries, Delays, Priority Queues, Flows และ Distributed Processing สำหรับ Production-Ready Applications

Node.js
Redis
TypeScript
Queue

1. BullMQ คืออะไร

BullMQ เป็น high-performance Node.js library สำหรับจัดการ Job Queues ที่ใช้ Redis เป็น message broker พัฒนาต่อจาก Bull library โดยทีมเดียวกัน แต่เขียนใหม่ทั้งหมดด้วย TypeScript

BullMQ เป็น solution ที่นิยมใช้สำหรับ Background Job Processing ใน Production environments เพราะมี features ครบครันและมีความเสถียรสูง

Key Point: BullMQ ใช้ Redis เป็นหลักในการจัดเก็บ jobs ทำให้สามารถ scale ได้ง่ายและรองรับ distributed processing

2. ทำไมต้องใช้ Job Queue

เมื่อ application ต้องทำงานที่ใช้เวลานาน เช่น ส่ง emails, ประมวลผลไฟล์, หรือเรียก external APIs การทำงานแบบ synchronous จะทำให้ user ต้องรอนาน

ปัญหาที่พบบ่อย:

  • HTTP Request timeout เมื่อประมวลผลนานเกินไป
  • User experience แย่เพราะต้องรอ response
  • Server resource ถูกใช้จนหมด
  • ไม่สามารถ retry เมื่อเกิด error

Job Queue ช่วยแก้ปัญหาเหล่านี้:

  • Asynchronous Processing - ทำงานเบื้องหลัง ไม่ block request
  • Automatic Retries - retry อัตโนมัติเมื่อ job ล้มเหลว
  • Rate Limiting - ควบคุมปริมาณงานที่ประมวลผลพร้อมกัน
  • Priority - จัดลำดับความสำคัญของ jobs
  • Scalability - เพิ่ม workers ได้ตามต้องการ

3. การติดตั้ง

1 ติดตั้ง Redis

BullMQ ต้องการ Redis version 5.0 ขึ้นไป:

Terminal - Ubuntu/Debian
# ติดตั้ง Redis
sudo apt update
sudo apt install redis-server -y

# เริ่ม Redis service
sudo systemctl start redis-server
sudo systemctl enable redis-server

# ตรวจสอบ Redis status
redis-cli ping
# ควรได้ response: PONG
Terminal - Docker
# รัน Redis ด้วย Docker
docker run -d --name redis \
  -p 6379:6379 \
  redis:7-alpine

# ตรวจสอบว่า Redis ทำงาน
docker ps | grep redis

2 ติดตั้ง BullMQ

Terminal
# สร้างโปรเจค
mkdir bullmq-demo && cd bullmq-demo
npm init -y

# ติดตั้ง BullMQ
npm install bullmq

# ติดตั้ง ioredis ( Redis client)
npm install ioredis

4. BullMQ Architecture

Producer Node.js App queue.add() Job Data Redis Waiting List Active List Completed/Failed Workers Worker 1 Process: Email Status: Active Worker 2 Process: PDF Status: Active Worker 3 Process: Report Status: Idle 1. Add Job 2. Process

Producer

Application ที่สร้างและเพิ่ม jobs เข้า queue

Redis

จัดเก็บ jobs และจัดการ queue state

Workers

Process jobs จาก queue ทีละ job

5. การใช้งานพื้นฐาน

สร้าง Queue และเพิ่ม Jobs

producer.js
const { Queue } = require('bullmq');

// สร้าง Queue
const emailQueue = new Queue('email-queue', {
  connection: {
    host: 'localhost',
    port: 6379,
  },
});

// เพิ่ม Job เบื้องต้น
async function sendEmail() {
  await emailQueue.add('send-welcome', {
    to: 'user@example.com',
    subject: 'Welcome!',
    body: 'Welcome to our service!',
  });
  
  console.log('Job added to queue!');
}

// เพิ่ม Job พร้อม options
async function sendEmailWithOptions() {
  await emailQueue.add('send-notification', {
    to: 'user@example.com',
    template: 'notification',
  }, {
    // ดีเลย์ 5 วินาที
    delay: 5000,
    
    // Priority สูง (ค่าน้อย = priority สูง)
    priority: 1,
    
    // จำกัดจำนวน retries
    attempts: 3,
    
    // Backoff strategy
    backoff: {
      type: 'exponential',
      delay: 1000,
    },
    
    // Job timeout
    timeout: 30000,
  });
}

sendEmail();

สร้าง Worker สำหรับประมวลผล Jobs

worker.js
const { Worker } = require('bullmq');

// สร้าง Worker
const worker = new Worker(
  'email-queue',
  async (job) => {
    console.log(`Processing job ${job.id} - ${job.name}`);
    console.log('Job data:', job.data);
    
    // จำลองการส่ง email
    const { to, subject, body } = job.data;
    
    // อัพเดท progress
    await job.updateProgress(50);
    
    // ส่ง email (ตัวอย่าง)
    await sendEmailToUser(to, subject, body);
    
    // อัพเดท progress เป็น 100%
    await job.updateProgress(100);
    
    // Return ผลลัพธ์
    return { success: true, sentAt: new Date() };
  },
  {
    connection: {
      host: 'localhost',
      port: 6379,
    },
    // จำนวน concurrent jobs
    concurrency: 5,
  }
);

// Event listeners
worker.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

worker.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed:`, err.message);
});

worker.on('progress', (job, progress) => {
  console.log(`Job ${job.id} progress: ${progress}%`);
});

// Mock function
async function sendEmailToUser(to, subject, body) {
  console.log(`Sending email to ${to}...`);
  await new Promise(resolve => setTimeout(resolve, 1000));
  console.log('Email sent!');
}

TypeScript Example

queue.ts
import { Queue, Worker, Job } from 'bullmq';

// Define job data type
interface EmailJobData {
  to: string;
  subject: string;
  body: string;
  template?: string;
}

// Define job return type
interface EmailJobResult {
  success: boolean;
  messageId: string;
  sentAt: Date;
}

// Create typed queue
const emailQueue = new Queue('email-queue');

// Create typed worker
const worker = new Worker(
  'email-queue',
  async (job: Job) => {
    const { to, subject, body } = job.data;
    
    // Type-safe job processing
    const messageId = `msg_${Date.now()}`;
    
    // Business logic here...
    
    return {
      success: true,
      messageId,
      sentAt: new Date(),
    };
  }
);

6. คุณสมบัติขั้นสูง

Delayed Jobs

สร้าง jobs ที่จะทำงานในเวลาที่กำหนด:

delayed-jobs.js
// ดีเลย์ 1 นาที
await emailQueue.add('reminder', {
  userId: 123,
  message: 'Please complete your profile',
}, {
  delay: 60000, // 60 seconds
});

// กำหนดเวลาที่แน่นอน
const scheduleTime = new Date('2026-03-01T09:00:00Z');
await emailQueue.add('scheduled-email', {
  to: 'user@example.com',
  subject: 'Monthly Report',
}, {
  delay: scheduleTime.getTime() - Date.now(),
});

Repeatable Jobs (Cron)

สร้าง jobs ที่ทำงานซ้ำตามกำหนดเวลา:

repeatable-jobs.js
// ทุก ๆ 10 นาที
await emailQueue.add('cleanup', {}, {
  repeat: {
    every: 600000, // 10 minutes in ms
  },
});

// ใช้ Cron expression - ทุกวันเวลา 09:00
await emailQueue.add('daily-report', {
  type: 'daily',
}, {
  repeat: {
    pattern: '0 9 * * *', // Every day at 9:00 AM
  },
});

// ทุกวันจันทร์เวลา 10:00
await emailQueue.add('weekly-summary', {
  type: 'weekly',
}, {
  repeat: {
    pattern: '0 10 * * 1', // Every Monday at 10:00 AM
  },
});

// หยุด repeatable job
const repeatableJobs = await emailQueue.getRepeatableJobs();
for (const job of repeatableJobs) {
  if (job.name === 'daily-report') {
    await emailQueue.removeRepeatableByKey(job.key);
  }
}

Priority Queues

จัดลำดับความสำคัญของ jobs (ค่าน้อย = priority สูง):

priority-queues.js
// High priority (ค่าน้อย)
await emailQueue.add('urgent-email', {
  to: 'admin@example.com',
  subject: 'Server Alert!',
}, { priority: 1 });

// Normal priority
await emailQueue.add('regular-email', {
  to: 'user@example.com',
  subject: 'Newsletter',
}, { priority: 10 });

// Low priority (ค่ามาก)
await emailQueue.add('bulk-email', {
  to: 'subscribers@example.com',
  subject: 'Monthly Update',
}, { priority: 100 });

Flows (Job Dependencies)

สร้าง chain หรือ tree ของ jobs ที่มี dependencies:

flows.js
const { FlowProducer } = require('bullmq');

const flowProducer = new FlowProducer();

// สร้าง flow แบบ chain
await flowProducer.add({
  name: 'process-order',
  queueName: 'order-queue',
  data: { orderId: 123 },
  children: [
    {
      name: 'validate-payment',
      queueName: 'payment-queue',
      data: { orderId: 123 },
      children: [
        {
          name: 'send-receipt',
          queueName: 'email-queue',
          data: { orderId: 123, email: 'user@example.com' },
        },
      ],
    },
    {
      name: 'update-inventory',
      queueName: 'inventory-queue',
      data: { orderId: 123 },
    },
  ],
});

// Flow จะทำงานจากล่างขึ้นบน:
// 1. validate-payment -> send-receipt (child)
// 2. update-inventory (parallel)
// 3. process-order (parent - รอ children เสร็จทั้งหมด)

7. Error Handling & Retries

Retry Strategy

retry-strategy.js
await emailQueue.add('send-email', {
  to: 'user@example.com',
  subject: 'Test',
}, {
  // จำนวนครั้งที่ retry
  attempts: 5,
  
  // Backoff strategy
  backoff: {
    // 'fixed' หรือ 'exponential'
    type: 'exponential',
    
    // ดีเลย์เริ่มต้น (ms)
    delay: 1000,
  },
});

// ใน Worker - throw error เพื่อ trigger retry
const worker = new Worker('email-queue', async (job) => {
  try {
    // พยายามส่ง email
    await sendEmail(job.data);
  } catch (error) {
    // Check ว่าเป็น error ที่ควร retry หรือไม่
    if (error.code === 'RATE_LIMIT') {
      // throw เพื่อ retry
      throw error;
    } else {
      // ไม่ retry - ถือว่า job failed ถาวร
      await job.moveToFailed(error, 'No retry for this error');
      return;
    }
  }
});

Dead Letter Queue

จัดการ jobs ที่ fail ถาวร:

dead-letter-queue.js
const { Queue, Worker } = require('bullmq');

// Main queue
const mainQueue = new Queue('main-queue');

// Dead Letter Queue
const dlq = new Queue('dead-letter-queue');

const worker = new Worker('main-queue', async (job) => {
  try {
    await processJob(job);
  } catch (error) {
    // ถ้า retry ครบแล้ว ย้ายไป DLQ
    if (job.attemptsMade >= job.opts.attempts) {
      await dlq.add('failed-job', {
        originalJob: job.data,
        error: error.message,
        failedAt: new Date(),
        attemptsMade: job.attemptsMade,
      });
    }
    throw error;
  }
}, {
  // Maximum attempts
  attempts: 3,
  
  backoff: {
    type: 'exponential',
    delay: 1000,
  },
});

// Worker สำหรับ DLQ - ส่ง notification หรือ log
const dlqWorker = new Worker('dead-letter-queue', async (job) => {
  console.error('Job failed permanently:', job.data);
  
  // ส่ง alert ไปทีม dev
  await sendAlert({
    type: 'job_failed',
    data: job.data,
  });
});

8. Production Deployment Tips

Redis Configuration

// redis.conf
maxmemory 2gb
maxmemory-policy allkeys-lru
save ""  # ปิด persistence ถ้าไม่จำเป็น

Worker Configuration

const worker = new Worker('my-queue', processor, {
  connection: {
    host: process.env.REDIS_HOST,
    port: 6379,
    maxRetriesPerRequest: null, // สำคัญ!
    enableReadyCheck: false,
  },
  concurrency: 10, // จำนวน concurrent jobs
  limiter: {
    max: 100, // max jobs
    duration: 1000, // per second
  },
});

Graceful Shutdown

process.on('SIGTERM', async () => {
  console.log('Shutting down worker...');
  
  // หยุดรับ jobs ใหม่
  await worker.close();
  
  // รอ jobs ที่กำลังทำอยู่เสร็จ
  // (worker.close() จะทำให้)
  
  process.exit(0);
});

9. Best Practices

Keep Jobs Small

เก็บเฉพาะ references ใน job data ไม่เก็บไฟล์หรือข้อมูลขนาดใหญ่

Idempotent Jobs

ออกแบบให้ job สามารถรันซ้ำได้โดยไม่เกิด side effects

Secure Redis

ใช้ Redis ACLs และ TLS ใน production environment

Monitor Queue Health

ติดตาม waiting, active, completed, failed counts

Set Reasonable Timeouts

กำหนด timeout สำหรับแต่ละ job type

สรุป

BullMQ เป็น solution ที่ยอดเยี่ยมสำหรับ Node.js applications ที่ต้องการ background job processing โดยมีจุดเด่น:

  • ใช้ Redis ทำให้ scale ได้ง่าย
  • รองรับ TypeScript อย่างสมบูรณ์
  • Features ครบครัน (retries, delays, priorities, flows)
  • Active community และ documentation ดี
  • Production-ready และ battle-tested
npm install bullmq Open Source MIT License

แชร์บทความนี้