1 บทนำ: ทำไมต้อง RabbitMQ + Node.js?
ในโลกของการพัฒนาแอปพลิเคชันสมัยใหม่ สิ่งหนึ่งที่นักพัฒนาต้องเผชิญคือความต้องการในการสร้างระบบที่ "สามารถขยายได้ (Scalable)", "ทนทานต่อความล้มเหลว (Resilient)", และ "สามารถทำงานแบบไม่ประสานเวลากัน (Asynchronous)" RabbitMQ คือคำตอบสำหรับความท้าทายเหล่านี้
Asynchronous Processing
ไม่ต้องรอให้ task หนึ่งเสร็จก่อนเริ่ม task ถัดไป ตัวอย่างเช่น ส่งอีเมลแจ้งเตือน หรือประมวลผลวิดีโอ
Decoupling
แยกบริการต่างๆ ออกจากกัน แต่ยังสามารถสื่อสารกันได้ผ่าน message queue อย่างมีประสิทธิภาพ
Load Balancing
กระจายภาระงานหลายๆ ชิ้นให้กับ consumers หลายคน ช่วยเพิ่ม throughput และ reliability
Backpressure
เมื่อ consumer ไม่สามารถประมวลผลได้ทัน queue จะเก็บข้อความไว้ในหน่วยความจำ/ดิสก์แทนที่จะล้นระบบ
ใช้ RabbitMQ เมื่อไหร่?
- ต้องการส่งอีเมลแจ้งเตือน หรือ SMS โดยไม่ให้ blocking UI
- ต้องการสร้าง Microservices ที่สามารถ scale ได้อิสระ
- ต้องการประมวลผลรูปภาพ/วิดีโอขนาดใหญ่
- ต้องการระบบ logging แบบ distributed
- มี peak traffic และต้องการ buffer สำหรับ handle load สูง
Architecture Diagram: Asynchronous Processing Pattern
ภาพรวมของระบบเมื่อใช้ RabbitMQ กับ Node.js — ผู้ใช้ส่ง request → แอปประมวลผลหลัก → วางข้อความใน queue → ตอบกลับทันที → Consumer ประมวลผลในพื้นหลัง
คำอธิบาย:
- Producer: แอปที่สร้างข้อความ (เช่น API endpoint ที่รับ order)
- RabbitMQ: Message Broker — เก็บข้อความไว้ใน queue จนกว่า consumer จะพร้อมรับ
- Consumer: แอปที่ประมวลผลข้อความ (เช่น worker ที่ส่งอีเมล)
- Response: ตอบกลับ client ทันทีโดยไม่ต้องรอให้ task เสร็จ
2 Prerequisites (สิ่งที่ต้องเตรียม)
Node.js & npm
ติดตั้ง Node.js เวอร์ชัน 16 ขึ้นไป (LTS แนะนำ)
v18.19.0
$ npm --version
9.2.0
Docker & Docker Compose
ติดตั้ง Docker Desktop หรือ Docker Engine + Compose plugin
Docker version 24.0.5
$ docker compose version
Docker Compose version v2.20.0
Bonus: VS Code + Extensions (แนะนำ)
3 RabbitMQ คืออะไร? AMQP Protocol พื้นฐาน
Producer (ผู้ส่ง)
แอปพลิเคชันที่สร้างและส่งข้อความ (message) ไปยัง RabbitMQ
Queue (คิว)
Buffer ที่ RabbitMQ เก็บข้อความไว้ — queue สามารถรองรับข้อความได้ไม่จำกัด (ขึ้นกับ RAM/Disk)
Consumer (ผู้รับ)
แอปพลิเคชันที่รอรับและประมวลผลข้อความจาก queue
AMQP Protocol — Advanced Message Queuing Protocol
AMQP เป็น "protocol แบบ open-standard" สำหรับ messaging queue ที่ RabbitMQ ใช้
คุณสมบัติสำคัญ
- -message durability (เก็บข้อความไว้ในดิสก์)
- acknowledgement (Consumer ยืนยันการรับ)
- routing (ส่งข้อความไปยัง queue ที่ถูกต้อง)
- transactions (guaranteed delivery)
Architecture Components
- Exchange: ตัวส่งข้อความไปยัง queue (ตาม routing key)
- Binding: เส้นทางที่เชื่อม exchange กับ queue
- Routing Key: ป้ายกำกับเพื่อ routing ข้อความ
ตัวอย่างใช้งานจริง
สมมติคุณมีเว็บไซต์ขายของออนไลน์: เมื่อลูกค้าสั่งสินค้า (order), แอปของคุณจะ:
- บันทึก order ลง database
- สร้างข้อความ "ORDER_CREATED" ไปยัง RabbitMQ
- ตอบกลับลูกค้าทันทีว่า "Order placed successfully!"
- Worker พร้อมกันจะรับข้อความจาก queue ไปประมวลผล:
- ส่งอีเมลยืนยันคำสั่งซื้อ
- อัปเดต stock system
- สร้างpacking list
4 Step 1: Docker Compose Setup (RabbitMQ + MongoDB)
สร้างไฟล์ docker-compose.yml ที่จะ setup RabbitMQ พร้อม management UI และ MongoDB (สำหรับเก็บ data)
version: '3.8'
services:
# RabbitMQ server with management UI
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672" # AMQP protocol port (สำหรับ producer/consumer)
- "15672:15672" # Management UI port (สำหรับเข้า web interface)
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
volumes:
- rabbitmq_data:/var/lib/rabbitmq
# MongoDB database (สำหรับเก็บข้อมูล app เช่น orders, users)
mongodb:
image: mongodb/mongodb-community-server:latest
container_name: mongodb
ports:
- "27017:27017"
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
MONGO_INITDB_DATABASE: app
volumes:
- mongodb_data:/data/db
volumes:
rabbitmq_data:
mongodb_data:
คำสั่งที่ต้องรัน
เชื่อมต่อ MongoDB & RabbitMQ จาก Node.js
RABBITMQ_URL=amqp://guest:guest@localhost:5672
หมายเหตุ: RabbitMQ มี credentials ค่าง default เป็น guest/guest (เฉพาะ localhost)
ส่วน MongoDB เราตั้ง root password เป็น example
ตรวจสอบ RabbitMQ Status
- เปิด browser เข้า: http://localhost:15672
- username:
guest, password:guest - ควรเห็นหน้าจอ login ของ management UI
- หลัง login → Overview tab → จะเห็น statistics ของ queue/exchange
5 Step 2: Node.js Producer (amqplib)
Producer คือแอปที่ส่งข้อความ (message) ไปยัง RabbitMQ queue เราจะใช้ library amqplib
$ npm init -y
$ npm install amqplib
// producer.js — ส่งข้อความไปยัง RabbitMQ
const amqp = require('amqplib/callback_api');
const QUEUE_NAME = 'tasks';
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';
function sendMessage(message) {
amqp.connect(RABBITMQ_URL, (err, connection) => {
if (err) {
console.error('❌ RabbitMQ connection error:', err.message);
return;
}
console.log('✅ Connected to RabbitMQ!');
connection.createChannel((err, channel) => {
if (err) {
console.error('❌ Channel creation error:', err.message);
return;
}
// Ensure the queue exists (idempotent)
channel.assertQueue(QUEUE_NAME, {
durable: true, // Queue จะไม่หายไปหลัง RabbitMQ restart
messageTimetToLive: 600000 // ล้างข้อความเก่าที่ค้าง 10 นาที
});
// Send message
const messageBuffer = Buffer.from(JSON.stringify(message));
channel.sendToQueue(QUEUE_NAME, messageBuffer, {
persistent: true // Message จะไม่หายไปถ้า RabbitMQ crash
});
console.log('📦 Message sent:', JSON.stringify(message, null, 2));
// Close after sending
setTimeout(() => {
channel.close();
connection.close();
}, 500);
});
});
}
// Example usage
const task = {
type: 'send_email',
payload: {
to: 'customer@example.com',
subject: 'Welcome to our platform!',
template: 'welcome_email'
},
priority: 1,
createdAt: new Date().toISOString()
};
console.log('🚀 Sending task to RabbitMQ...');
sendMessage(task);
Run Producer
ควรเห็น output ว่าMessage sent และข้อความจะถูกส่งไปที่ queue tasks
Flow Diagram: Producer → Queue
Process flow: Producer → (createChannel) → assertQueue → sendToQueue → (persistent message)
6 Step 3: Node.js Consumer (amqplib)
Consumer คือแอปที่รอรับและประมวลผลข้อความจาก queue ซึ่งจะทำงานแบบ "continuous" (ไม่จบโปรแกรม)
$ npm init -y
$ npm install amqplib
// consumer.js — รับและประมวลผลข้อความจาก RabbitMQ
const amqp = require('amqplib/callback_api');
const QUEUE_NAME = 'tasks';
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';
// จำลองการส่งอีเมล
function sendEmail(payload) {
return new Promise((resolve, reject) => {
console.log(`📧 Email service: Sending to ${payload.to}...`);
// Simulate email sending delay
setTimeout(() => {
console.log(`✅ Email sent to ${payload.to}`);
resolve();
}, 1000);
});
}
function processMessage(message) {
const messageData = JSON.parse(message.content.toString());
console.log('📦 Message received:', JSON.stringify(messageData, null, 2));
if (messageData.type === 'send_email') {
return sendEmail(messageData.payload)
.then(() => {
console.log('✅ Email task completed');
})
.catch(err => {
console.error('❌ Email task failed:', err.message);
throw err;
});
}
console.log('⚠️ Unknown message type:', messageData.type);
}
function startConsumer() {
amqp.connect(RABBITMQ_URL, (err, connection) => {
if (err) {
console.error('❌ RabbitMQ connection error:', err.message);
console.log('🔄 Retrying in 5 seconds...');
setTimeout(startConsumer, 5000);
return;
}
console.log('✅ Connected to RabbitMQ!');
connection.createChannel((err, channel) => {
if (err) {
console.error('❌ Channel creation error:', err.message);
return;
}
channel.assertQueue(QUEUE_NAME, {
durable: true
});
// ตั้งค่าความเร็วในการประมวลผล — 1 ข้อความต่อครั้ง
channel.prefetch(1);
console.log('⏳ Waiting for messages in queue:', QUEUE_NAME);
// Start consuming messages
channel.consume(QUEUE_NAME, (msg) => {
processMessage(msg)
.then(() => {
// Acknowledge after success
channel.ack(msg);
})
.catch(() => {
// Reject and requeue (หรือ dead-letter queue)
channel.nack(msg, false, true);
});
}, {
noAck: false // ต้อง ack manually (เพื่อ reliability)
});
});
});
}
// Run with auto-reconnect
startConsumer();
// Graceful shutdown
process.on('SIGINT', () => {
console.log('\n👋 Shutting down consumer...');
process.exit(0);
});
Run Consumer
- Consumer จะรันแบบ continuous — ไม่จบโปรแกรม
- เมื่อมี message เข้ามาใน queue → process → ack → next message
- ถ้า crash หรือ error → message จะถูก requeue (รอกลับมาใหม่)
Flow Diagram: Queue → Consumer
Key concepts: channel.consume() — รอรับ message, channel.ack() — ยืนยันการรับ
7 Step 4: Microservices Pattern (Auth, Products, Orders)
ในแอปขนาดใหญ่ เราจะแยกเป็น "Microservices" หลายตัว ที่สื่อสารกันผ่าน RabbitMQ queue
บริการ (Services)
-
1
Auth Service (Port 3000) — จัดการ user login/register
-
2
Products Service (Port 3001) — จัดการ product catalog
-
3
Orders Service (Port 3002) — จัดการ order
-
4
Notifications Service (Port 3003) — ส่งอีเมล/แจ้งเตือน
_queues_
-
U
user.created — Auth → Notification
-
P
product.updated — Products → Notifications
-
O
order.created — Orders → Notification, Inventory
-
N
notification.send — All → Notification Worker
Full Architecture Diagram: Microservices Pattern
Benefits
- แยก service ได้อิสระ → scalability
- Decoupling → แก้ไขตัว service ได้โดยไม่กระทบอื่น
- Fault isolation — one service down ≠ all down
Challenges
- Complexity — debugging, monitoring ยากขึ้น
- Network latency — cross-service calls
- Consistency — eventual consistency model
Implementation
- HTTP API — REST/GraphQL (synchronous)
- Message Queue — RabbitMQ/Kafka (async)
- Service Discovery — Consul/Etcd
8 Management Dashboard UI Walkthrough
RabbitMQ มี Management Plugin ที่มาพร้อมกับ image rabbitmq:3-management
- URL: http://localhost:15672
- Username:
guest, Password:guest(default)
Figure 5: RabbitMQ Management Dashboard UI (simplified)
Overview Tab
- 📊 Total queues, exchanges, consumers
- 📈 Message rates (in/out, acks)
- 💾 Memory/disk usage per node
Queues Tab
- 📦 List all queues + messages count
- ⏱️ Rate metrics (deliver/get, acks)
- 🔧 Actions: Purge, Delete, Publish
Admin Tab
- 👥 User management (add/delete users)
- 🔐 Permissions (vhosts, resources)
- 🔄 Policy management (HA, TTL, etc)
Tips สำหรับการใช้ Management UI
- ตั้ง username/password ใหม่ (อย่าใช้ guest/guest ตลอด)
- ใช้ Policies เพื่อกำหนด queue TTL, max-length, HA
- ตรวจสอบคิวเป็นระยะๆ — ถ้า messages count สูงขึ้น แสดงว่า consumer ทำงานไม่ทัน
- ดู Channels tab — ถ้ามี connection leak (connections ที่ไม่ปิด)
9 Monitoring & Troubleshooting
Monitoring RabbitMQ เหมือน vigilance ของระบบ messaging — ช่วยให้คุณตรวจจับปัญหาได้ก่อนที่จะเกิดใหญ่
ตัวชี้วัด (Metrics) สำคัญ
- Messages in/out: Rate ที่ข้อความเข้า-ออก queue
- Queue depth: จำนวนข้อความค้างใน queue (ควรต่ำ)
- Consumers: จำนวน consumer ที่เชื่อมต่อ
- Memory usage: RabbitMQ ใช้ RAM เท่าไหร่
- Open file descriptors: ตรวจสอบ resource leak
วิธี Monitoring
- RabbitMQ Management UI: http://localhost:15672 (basic)
- Prometheus + Grafana: สำหรับ production cluster
- rabbitmqadmin CLI: API client สำหรับ automation
- Logs: /var/log/rabbitmq/rabbit@*.log
Troubleshooting: Common Issues
Issue: Queue messages keep growing
consumer ไม่สามารถประมวลผลได้ทันกับ producer ที่ส่งเข้ามา
- ✅ เพิ่ม consumer จำนวน
- ✅ ตรวจสอบ logs — มี crash/error หรือไม่
- ✅ เพิ่ม queue memory/disk limit (prevent OOM)
Issue: Connection refused / ECONNREFUSED
RabbitMQ ไม่พร้อมใช้งาน หรือ port ผิด
- ✅ ตรวจสอบ container status:
docker ps - ✅ ตรวจสอบ logs:
docker logs rabbitmq - ✅ ลอง ping:
docker exec -it rabbitmq rabbitmq-diagnostics ping
Issue: Authentication failed
ตรวจสอบ credentials ให้ถูกต้อง
- ✅ Docker:
RABBITMQ_DEFAULT_USER,RABBITMQ_DEFAULT_PASS - ✅ localhost ใช้
guest/guestจริง แต่ remote มี firewall
Best Monitoring Setup (Production)
Stack: Prometheus + Grafana
- 📊 Docker Prometheus exporter:
prom/rabbitmq-exporter - 📈 Grafana dashboard: 10991 (RabbitMQ Overview)
- 🔔 Alert rules: Queue depth > threshold, consumers down
Stack: OpenTelemetry + Jaeger
- 📦 Distributed tracing ( RabbitMQ → service calls)
- 🔍 Correlate errors across services
- ⏱️ Measure end-to-end latency
10 Best Practices & Tips
เทคนิคต่างๆ ที่จะช่วยให้คุณใช้ RabbitMQ ได้อย่างปลอดภัย และมีประสิทธิภาพสูงสุด
Security
- 1. ตั้ง credentials ใหม่: หลีกเลี่ยง guest/guest
- 2. ใช้ TLS/SSL: ปิดการสื่อสารที่ไม่ปลอดภัย
- 3. จำกัด vhosts: แยก env (dev/staging/prod)
- 4. ใช้ token-based: AMQPS หรือ OAuth2
Performance
- 1. ตั้ง prefetch:
channel.prefetch(10) - 2. ใช้ batch processing: จัดกลุ่มข้อความก่อนประมวลผล
- 3. ตั้ง durable + message TTL: ล้างข้อความเก่า
- 4. Horizontal scale: เพิ่ม consumer nodes
Node.js Code Patterns
// Connection reuse pattern (อย่า create connection ทุก request)
const amqp = require('amqplib/callback_api');
let channel;
function getChannel() {
return new Promise((resolve, reject) => {
if (channel) return resolve(channel);
amqp.connect(process.env.RABBITMQ_URL, (err, conn) => {
if (err) return reject(err);
conn.createChannel((err, ch) => {
if (err) return reject(err);
channel = ch;
resolve(ch);
});
});
});
}
// Usage
async function sendMessage(message) {
const ch = await getChannel();
ch.assertQueue('tasks', { durable: true });
ch.sendToQueue('tasks', Buffer.from(JSON.stringify(message)));
}
Connection reuse: สร้าง connection ครั้งเดียวแล้ว reuse — ลด overhead
คู่มือตัดสินใจ: ใช้ Queue หรือไม่?
ถามตัวเอง 3 คำถามนี้ก่อนใช้ RabbitMQ:
ต้องการประมวลผลแบบไม่ประสานเวลากัน (Asynchronous)?
เช่น ส่งอีเมล, ประมวลผลวิดีโอ — ตอบ "ใช่"
งานมีความซับซ้อน/ใช้เวลา (1 วินาทีขึ้นไป)?
ถ้าใช้ HTTP API แล้ว timeout — ใช้ queue จะดีกว่า
มีหลาย services ที่ต้องสื่อสารกัน?
Microservices — queue คือ lifeblood ของระบบ
ถ้าตอบ "ใช่" มากกว่า 1 ข้อ → ควรใช้ RabbitMQ!
11 FAQ & Common Issues
ฉันจะรู้ได้ยังไงว่า queue มีข้อความค้าง?
เข้า Management UI → Queues tab → เห็นคอลัมน์ "Messages" (unacked + ready)
หรือ: ใช้ CLI rabbitmqadmin list queues name messages
ถ้า consumer crash ข้อความจะหายไหม?
ไม่หาย! เพราะ Node.js ใช้ noAck: false และ channel.ack() เมื่อประมวลผลสำเร็จ
ถ้า consumer crash: message จะถูก requeue (กลับมาใหม่ในคิว)
RabbitMQ กับ Redis Queue ต่างกันยังไง?
RabbitMQ: AMQP protocol, persistent, robust, งานหนักๆ ที่ต้องการความน่าเชื่อถือสูง
Redis Queue: In-memory, fast, สำหรับ lightweight tasks
ใช้ RabbitMQ เมื่อต้องการ durability + guaranteed delivery
How to scale RabbitMQ to cluster?
1. Setup multi-node cluster: rabbitmq-server -detached
2. Join nodes: rabbitmqctl join_cluster rabbit@node1
3. Setup HA policy: rabbitmqctl set_policy HA '.*' '{"ha-mode":"all"}'
Best practice สำหรับ message size?
สูงสุดที่แนะนำ: 1MB ต่อข้อความ
ถ้าข้อมูลใหญ่กว่านั้น:
- Upload ไปยัง object storage (S3/GCS) → ส่ง URL ใน message
- Use message compression (gzip)
How to monitor RabbitMQ from Node.js?
ใช้ rabbitmq-admin-api หรือ axios/fetch เรียก management API:
GET http://localhost:15672/api/queues/%2F