บทความล่าสุด 2026

RabbitMQ with Node.js: Complete Guide for Microservices

คู่มือลับ RabbitMQ กับ Node.js แบบละเอียดที่สุด เริ่มจากพื้นฐาน Microservices, AMQP Protocol, Docker, ไปจนถึง Producer/Consumer Pattern และ Management Dashboard

25 นาที
Microservices, Docker, AMQP
Node.js, RabbitMQ

1 บทนำ: ทำไมต้อง RabbitMQ + Node.js?

ในโลกของการพัฒนาแอปพลิเคชันสมัยใหม่ สิ่งหนึ่งที่นักพัฒนาต้องเผชิญคือความต้องการในการสร้างระบบที่ "สามารถขยายได้ (Scalable)", "ทนทานต่อความล้มเหลว (Resilient)", และ "สามารถทำงานแบบไม่ประสานเวลากัน (Asynchronous)" RabbitMQ คือคำตอบสำหรับความท้าทายเหล่านี้

Asynchronous Processing

ไม่ต้องรอให้ task หนึ่งเสร็จก่อนเริ่ม task ถัดไป ตัวอย่างเช่น ส่งอีเมลแจ้งเตือน หรือประมวลผลวิดีโอ

Decoupling

แยกบริการต่างๆ ออกจากกัน แต่ยังสามารถสื่อสารกันได้ผ่าน message queue อย่างมีประสิทธิภาพ

Load Balancing

กระจายภาระงานหลายๆ ชิ้นให้กับ consumers หลายคน ช่วยเพิ่ม throughput และ reliability

Backpressure

เมื่อ consumer ไม่สามารถประมวลผลได้ทัน queue จะเก็บข้อความไว้ในหน่วยความจำ/ดิสก์แทนที่จะล้นระบบ

ใช้ RabbitMQ เมื่อไหร่?

  • ต้องการส่งอีเมลแจ้งเตือน หรือ SMS โดยไม่ให้ blocking UI
  • ต้องการสร้าง Microservices ที่สามารถ scale ได้อิสระ
  • ต้องการประมวลผลรูปภาพ/วิดีโอขนาดใหญ่
  • ต้องการระบบ logging แบบ distributed
  • มี peak traffic และต้องการ buffer สำหรับ handle load สูง

Architecture Diagram: Asynchronous Processing Pattern

ภาพรวมของระบบเมื่อใช้ RabbitMQ กับ Node.js — ผู้ใช้ส่ง request → แอปประมวลผลหลัก → วางข้อความใน queue → ตอบกลับทันที → Consumer ประมวลผลในพื้นหลัง

User/Client Web/Mobile App API Server Node.js/Express Producer Node.js AMQPClient RabbitMQ Broker Queue (Task) Messages Buffer Consumer Node.js Worker Response Consumer 2 Node.js Worker Consumer 3 Node.js Worker Store Data Database PostgreSQL/MongoDB

คำอธิบาย:

  • Producer: แอปที่สร้างข้อความ (เช่น API endpoint ที่รับ order)
  • RabbitMQ: Message Broker — เก็บข้อความไว้ใน queue จนกว่า consumer จะพร้อมรับ
  • Consumer: แอปที่ประมวลผลข้อความ (เช่น worker ที่ส่งอีเมล)
  • Response: ตอบกลับ client ทันทีโดยไม่ต้องรอให้ task เสร็จ

2 Prerequisites (สิ่งที่ต้องเตรียม)

Node.js & npm

ติดตั้ง Node.js เวอร์ชัน 16 ขึ้นไป (LTS แนะนำ)

$ node --version
v18.19.0
$ npm --version
9.2.0

Docker & Docker Compose

ติดตั้ง Docker Desktop หรือ Docker Engine + Compose plugin

$ docker --version
Docker version 24.0.5
$ docker compose version
Docker Compose version v2.20.0

Bonus: VS Code + Extensions (แนะนำ)

Visual Studio Code (IDE)
ESLint (Linting)
Docker (Container support)
Prettier (Formatting)

3 RabbitMQ คืออะไร? AMQP Protocol พื้นฐาน

Producer (ผู้ส่ง)

แอปพลิเคชันที่สร้างและส่งข้อความ (message) ไปยัง RabbitMQ

Queue (คิว)

Buffer ที่ RabbitMQ เก็บข้อความไว้ — queue สามารถรองรับข้อความได้ไม่จำกัด (ขึ้นกับ RAM/Disk)

Consumer (ผู้รับ)

แอปพลิเคชันที่รอรับและประมวลผลข้อความจาก queue

AMQP Protocol — Advanced Message Queuing Protocol

AMQP เป็น "protocol แบบ open-standard" สำหรับ messaging queue ที่ RabbitMQ ใช้

คุณสมบัติสำคัญ

  • -message durability (เก็บข้อความไว้ในดิสก์)
  • acknowledgement (Consumer ยืนยันการรับ)
  • routing (ส่งข้อความไปยัง queue ที่ถูกต้อง)
  • transactions (guaranteed delivery)

Architecture Components

  • Exchange: ตัวส่งข้อความไปยัง queue (ตาม routing key)
  • Binding: เส้นทางที่เชื่อม exchange กับ queue
  • Routing Key: ป้ายกำกับเพื่อ routing ข้อความ

ตัวอย่างใช้งานจริง

สมมติคุณมีเว็บไซต์ขายของออนไลน์: เมื่อลูกค้าสั่งสินค้า (order), แอปของคุณจะ:

  1. บันทึก order ลง database
  2. สร้างข้อความ "ORDER_CREATED" ไปยัง RabbitMQ
  3. ตอบกลับลูกค้าทันทีว่า "Order placed successfully!"
  4. Worker พร้อมกันจะรับข้อความจาก queue ไปประมวลผล:
    • ส่งอีเมลยืนยันคำสั่งซื้อ
    • อัปเดต stock system
    • สร้างpacking list

4 Step 1: Docker Compose Setup (RabbitMQ + MongoDB)

สร้างไฟล์ docker-compose.yml ที่จะ setup RabbitMQ พร้อม management UI และ MongoDB (สำหรับเก็บ data)

version: '3.8'

services:
  # RabbitMQ server with management UI
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq
    ports:
      - "5672:5672"      # AMQP protocol port (สำหรับ producer/consumer)
      - "15672:15672"    # Management UI port (สำหรับเข้า web interface)
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

  # MongoDB database (สำหรับเก็บข้อมูล app เช่น orders, users)
  mongodb:
    image: mongodb/mongodb-community-server:latest
    container_name: mongodb
    ports:
      - "27017:27017"
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
      MONGO_INITDB_DATABASE: app
    volumes:
      - mongodb_data:/data/db

volumes:
  rabbitmq_data:
  mongodb_data:

คำสั่งที่ต้องรัน

$ mkdir rabbitmq-nodejs-guide && cd rabbitmq-nodejs-guide
$ # สร้างไฟล์ docker-compose.yml ตามด้านบน
$ docker compose up -d
$ curl -s http://localhost:15672 | head -20 # เช็คว่า rabbitmq พร้อมใช้ไหม

เชื่อมต่อ MongoDB & RabbitMQ จาก Node.js

MONGO_URI=mongodb://root:example@localhost:27017/app?authSource=admin
RABBITMQ_URL=amqp://guest:guest@localhost:5672

หมายเหตุ: RabbitMQ มี credentials ค่าง default เป็น guest/guest (เฉพาะ localhost) ส่วน MongoDB เราตั้ง root password เป็น example

ตรวจสอบ RabbitMQ Status

  • เปิด browser เข้า: http://localhost:15672
  • username: guest, password: guest
  • ควรเห็นหน้าจอ login ของ management UI
  • หลัง login → Overview tab → จะเห็น statistics ของ queue/exchange

5 Step 2: Node.js Producer (amqplib)

Producer คือแอปที่ส่งข้อความ (message) ไปยัง RabbitMQ queue เราจะใช้ library amqplib

$ mkdir producer && cd producer
$ npm init -y
$ npm install amqplib
// producer.js — ส่งข้อความไปยัง RabbitMQ

const amqp = require('amqplib/callback_api');

const QUEUE_NAME = 'tasks';
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';

function sendMessage(message) {
    amqp.connect(RABBITMQ_URL, (err, connection) => {
        if (err) {
            console.error('❌ RabbitMQ connection error:', err.message);
            return;
        }

        console.log('✅ Connected to RabbitMQ!');

        connection.createChannel((err, channel) => {
            if (err) {
                console.error('❌ Channel creation error:', err.message);
                return;
            }

            // Ensure the queue exists (idempotent)
            channel.assertQueue(QUEUE_NAME, {
                durable: true,  // Queue จะไม่หายไปหลัง RabbitMQ restart
                messageTimetToLive: 600000  // ล้างข้อความเก่าที่ค้าง 10 นาที
            });

            // Send message
            const messageBuffer = Buffer.from(JSON.stringify(message));
            channel.sendToQueue(QUEUE_NAME, messageBuffer, {
                persistent: true  // Message จะไม่หายไปถ้า RabbitMQ crash
            });

            console.log('📦 Message sent:', JSON.stringify(message, null, 2));
            
            // Close after sending
            setTimeout(() => {
                channel.close();
                connection.close();
            }, 500);
        });
    });
}

// Example usage
const task = {
    type: 'send_email',
    payload: {
        to: 'customer@example.com',
        subject: 'Welcome to our platform!',
        template: 'welcome_email'
    },
    priority: 1,
    createdAt: new Date().toISOString()
};

console.log('🚀 Sending task to RabbitMQ...');
sendMessage(task);

Run Producer

$ node producer.js

ควรเห็น output ว่าMessage sent และข้อความจะถูกส่งไปที่ queue tasks

Flow Diagram: Producer → Queue

Producer Node.js Script RabbitMQ Broker Queue: "tasks"

Process flow: Producer → (createChannel) → assertQueue → sendToQueue → (persistent message)

6 Step 3: Node.js Consumer (amqplib)

Consumer คือแอปที่รอรับและประมวลผลข้อความจาก queue ซึ่งจะทำงานแบบ "continuous" (ไม่จบโปรแกรม)

$ mkdir consumer && cd consumer
$ npm init -y
$ npm install amqplib
// consumer.js — รับและประมวลผลข้อความจาก RabbitMQ

const amqp = require('amqplib/callback_api');

const QUEUE_NAME = 'tasks';
const RABBITMQ_URL = process.env.RABBITMQ_URL || 'amqp://guest:guest@localhost:5672';

// จำลองการส่งอีเมล
function sendEmail(payload) {
    return new Promise((resolve, reject) => {
        console.log(`📧 Email service: Sending to ${payload.to}...`);
        // Simulate email sending delay
        setTimeout(() => {
            console.log(`✅ Email sent to ${payload.to}`);
            resolve();
        }, 1000);
    });
}

function processMessage(message) {
    const messageData = JSON.parse(message.content.toString());
    console.log('📦 Message received:', JSON.stringify(messageData, null, 2));
    
    if (messageData.type === 'send_email') {
        return sendEmail(messageData.payload)
            .then(() => {
                console.log('✅ Email task completed');
            })
            .catch(err => {
                console.error('❌ Email task failed:', err.message);
                throw err;
            });
    }
    
    console.log('⚠️ Unknown message type:', messageData.type);
}

function startConsumer() {
    amqp.connect(RABBITMQ_URL, (err, connection) => {
        if (err) {
            console.error('❌ RabbitMQ connection error:', err.message);
            console.log('🔄 Retrying in 5 seconds...');
            setTimeout(startConsumer, 5000);
            return;
        }

        console.log('✅ Connected to RabbitMQ!');

        connection.createChannel((err, channel) => {
            if (err) {
                console.error('❌ Channel creation error:', err.message);
                return;
            }

            channel.assertQueue(QUEUE_NAME, {
                durable: true
            });

            // ตั้งค่าความเร็วในการประมวลผล — 1 ข้อความต่อครั้ง
            channel.prefetch(1);

            console.log('⏳ Waiting for messages in queue:', QUEUE_NAME);

            // Start consuming messages
            channel.consume(QUEUE_NAME, (msg) => {
                processMessage(msg)
                    .then(() => {
                        // Acknowledge after success
                        channel.ack(msg);
                    })
                    .catch(() => {
                        // Reject and requeue (หรือ dead-letter queue)
                        channel.nack(msg, false, true);
                    });
            }, {
                noAck: false  // ต้อง ack manually (เพื่อ reliability)
            });
        });
    });
}

// Run with auto-reconnect
startConsumer();

// Graceful shutdown
process.on('SIGINT', () => {
    console.log('\n👋 Shutting down consumer...');
    process.exit(0);
});

Run Consumer

$ node consumer.js
  • Consumer จะรันแบบ continuous — ไม่จบโปรแกรม
  • เมื่อมี message เข้ามาใน queue → process → ack → next message
  • ถ้า crash หรือ error → message จะถูก requeue (รอกลับมาใหม่)

Flow Diagram: Queue → Consumer

RabbitMQ Broker Queue: "tasks" Consumer Node.js Worker

Key concepts: channel.consume() — รอรับ message, channel.ack() — ยืนยันการรับ

7 Step 4: Microservices Pattern (Auth, Products, Orders)

ในแอปขนาดใหญ่ เราจะแยกเป็น "Microservices" หลายตัว ที่สื่อสารกันผ่าน RabbitMQ queue

บริการ (Services)

  • 1
    Auth Service (Port 3000) — จัดการ user login/register
  • 2
    Products Service (Port 3001) — จัดการ product catalog
  • 3
    Orders Service (Port 3002) — จัดการ order
  • 4
    Notifications Service (Port 3003) — ส่งอีเมล/แจ้งเตือน

_queues_

  • U
    user.created — Auth → Notification
  • P
    product.updated — Products → Notifications
  • O
    order.created — Orders → Notification, Inventory
  • N
    notification.send — All → Notification Worker

Full Architecture Diagram: Microservices Pattern

Client Web/Mobile API Gateway Node.js/Express Auth Service Port 3000 Products Service Port 3001 Orders Service Port 3002 RabbitMQ Broker Cluster user.created Auth → Notification product.updated Products → Notification order.created Orders → Notification Notifications Worker (Port 3003) MongoDB User/Products DB MongoDB Orders DB

Benefits

  • แยก service ได้อิสระ → scalability
  • Decoupling → แก้ไขตัว service ได้โดยไม่กระทบอื่น
  • Fault isolation — one service down ≠ all down

Challenges

  • Complexity — debugging, monitoring ยากขึ้น
  • Network latency — cross-service calls
  • Consistency — eventual consistency model

Implementation

  • HTTP API — REST/GraphQL (synchronous)
  • Message Queue — RabbitMQ/Kafka (async)
  • Service Discovery — Consul/Etcd

8 Management Dashboard UI Walkthrough

RabbitMQ มี Management Plugin ที่มาพร้อมกับ image rabbitmq:3-management

RabbitMQ Management Overview Connections Channels Exchanges Queues Admin Queues 0 Exchanges 6 Consumers 1 overview chart (messages in/out) Quick actions: Declare queue, Declare exchange, Import definitions RabbitMQ v3.12.10 • Management Plugin v3.12.10 Connected

Figure 5: RabbitMQ Management Dashboard UI (simplified)

Overview Tab

  • 📊 Total queues, exchanges, consumers
  • 📈 Message rates (in/out, acks)
  • 💾 Memory/disk usage per node

Queues Tab

  • 📦 List all queues + messages count
  • ⏱️ Rate metrics (deliver/get, acks)
  • 🔧 Actions: Purge, Delete, Publish

Admin Tab

  • 👥 User management (add/delete users)
  • 🔐 Permissions (vhosts, resources)
  • 🔄 Policy management (HA, TTL, etc)

Tips สำหรับการใช้ Management UI

  • ตั้ง username/password ใหม่ (อย่าใช้ guest/guest ตลอด)
  • ใช้ Policies เพื่อกำหนด queue TTL, max-length, HA
  • ตรวจสอบคิวเป็นระยะๆ — ถ้า messages count สูงขึ้น แสดงว่า consumer ทำงานไม่ทัน
  • ดู Channels tab — ถ้ามี connection leak (connections ที่ไม่ปิด)

9 Monitoring & Troubleshooting

Monitoring RabbitMQ เหมือน vigilance ของระบบ messaging — ช่วยให้คุณตรวจจับปัญหาได้ก่อนที่จะเกิดใหญ่

ตัวชี้วัด (Metrics) สำคัญ

  • Messages in/out: Rate ที่ข้อความเข้า-ออก queue
  • Queue depth: จำนวนข้อความค้างใน queue (ควรต่ำ)
  • Consumers: จำนวน consumer ที่เชื่อมต่อ
  • Memory usage: RabbitMQ ใช้ RAM เท่าไหร่
  • Open file descriptors: ตรวจสอบ resource leak

วิธี Monitoring

  • RabbitMQ Management UI: http://localhost:15672 (basic)
  • Prometheus + Grafana: สำหรับ production cluster
  • rabbitmqadmin CLI: API client สำหรับ automation
  • Logs: /var/log/rabbitmq/rabbit@*.log

Troubleshooting: Common Issues

Issue: Queue messages keep growing

consumer ไม่สามารถประมวลผลได้ทันกับ producer ที่ส่งเข้ามา

  • ✅ เพิ่ม consumer จำนวน
  • ✅ ตรวจสอบ logs — มี crash/error หรือไม่
  • ✅ เพิ่ม queue memory/disk limit (prevent OOM)

Issue: Connection refused / ECONNREFUSED

RabbitMQ ไม่พร้อมใช้งาน หรือ port ผิด

  • ✅ ตรวจสอบ container status: docker ps
  • ✅ ตรวจสอบ logs: docker logs rabbitmq
  • ✅ ลอง ping: docker exec -it rabbitmq rabbitmq-diagnostics ping

Issue: Authentication failed

ตรวจสอบ credentials ให้ถูกต้อง

  • ✅ Docker: RABBITMQ_DEFAULT_USER, RABBITMQ_DEFAULT_PASS
  • ✅ localhost ใช้ guest/guest จริง แต่ remote มี firewall

Best Monitoring Setup (Production)

Stack: Prometheus + Grafana

  • 📊 Docker Prometheus exporter: prom/rabbitmq-exporter
  • 📈 Grafana dashboard: 10991 (RabbitMQ Overview)
  • 🔔 Alert rules: Queue depth > threshold, consumers down

Stack: OpenTelemetry + Jaeger

  • 📦 Distributed tracing ( RabbitMQ → service calls)
  • 🔍 Correlate errors across services
  • ⏱️ Measure end-to-end latency

10 Best Practices & Tips

เทคนิคต่างๆ ที่จะช่วยให้คุณใช้ RabbitMQ ได้อย่างปลอดภัย และมีประสิทธิภาพสูงสุด

Security

  • 1. ตั้ง credentials ใหม่: หลีกเลี่ยง guest/guest
  • 2. ใช้ TLS/SSL: ปิดการสื่อสารที่ไม่ปลอดภัย
  • 3. จำกัด vhosts: แยก env (dev/staging/prod)
  • 4. ใช้ token-based: AMQPS หรือ OAuth2

Performance

  • 1. ตั้ง prefetch: channel.prefetch(10)
  • 2. ใช้ batch processing: จัดกลุ่มข้อความก่อนประมวลผล
  • 3. ตั้ง durable + message TTL: ล้างข้อความเก่า
  • 4. Horizontal scale: เพิ่ม consumer nodes

Node.js Code Patterns

// Connection reuse pattern (อย่า create connection ทุก request)
const amqp = require('amqplib/callback_api');

let channel;

function getChannel() {
    return new Promise((resolve, reject) => {
        if (channel) return resolve(channel);

        amqp.connect(process.env.RABBITMQ_URL, (err, conn) => {
            if (err) return reject(err);

            conn.createChannel((err, ch) => {
                if (err) return reject(err);

                channel = ch;
                resolve(ch);
            });
        });
    });
}

// Usage
async function sendMessage(message) {
    const ch = await getChannel();
    ch.assertQueue('tasks', { durable: true });
    ch.sendToQueue('tasks', Buffer.from(JSON.stringify(message)));
}

Connection reuse: สร้าง connection ครั้งเดียวแล้ว reuse — ลด overhead

คู่มือตัดสินใจ: ใช้ Queue หรือไม่?

ถามตัวเอง 3 คำถามนี้ก่อนใช้ RabbitMQ:

1

ต้องการประมวลผลแบบไม่ประสานเวลากัน (Asynchronous)?

เช่น ส่งอีเมล, ประมวลผลวิดีโอ — ตอบ "ใช่"

2

งานมีความซับซ้อน/ใช้เวลา (1 วินาทีขึ้นไป)?

ถ้าใช้ HTTP API แล้ว timeout — ใช้ queue จะดีกว่า

3

มีหลาย services ที่ต้องสื่อสารกัน?

Microservices — queue คือ lifeblood ของระบบ

ถ้าตอบ "ใช่" มากกว่า 1 ข้อ → ควรใช้ RabbitMQ!

11 FAQ & Common Issues

ฉันจะรู้ได้ยังไงว่า queue มีข้อความค้าง?

เข้า Management UI → Queues tab → เห็นคอลัมน์ "Messages" (unacked + ready)

หรือ: ใช้ CLI rabbitmqadmin list queues name messages

ถ้า consumer crash ข้อความจะหายไหม?

ไม่หาย! เพราะ Node.js ใช้ noAck: false และ channel.ack() เมื่อประมวลผลสำเร็จ

ถ้า consumer crash: message จะถูก requeue (กลับมาใหม่ในคิว)

RabbitMQ กับ Redis Queue ต่างกันยังไง?

RabbitMQ: AMQP protocol, persistent, robust, งานหนักๆ ที่ต้องการความน่าเชื่อถือสูง
Redis Queue: In-memory, fast, สำหรับ lightweight tasks

ใช้ RabbitMQ เมื่อต้องการ durability + guaranteed delivery

How to scale RabbitMQ to cluster?

1. Setup multi-node cluster: rabbitmq-server -detached
2. Join nodes: rabbitmqctl join_cluster rabbit@node1
3. Setup HA policy: rabbitmqctl set_policy HA '.*' '{"ha-mode":"all"}'

Best practice สำหรับ message size?

สูงสุดที่แนะนำ: 1MB ต่อข้อความ

ถ้าข้อมูลใหญ่กว่านั้น:
- Upload ไปยัง object storage (S3/GCS) → ส่ง URL ใน message
- Use message compression (gzip)

How to monitor RabbitMQ from Node.js?

ใช้ rabbitmq-admin-api หรือ axios/fetch เรียก management API:

GET http://localhost:15672/api/queues/%2F

ผู้เขียน: praewa_ai_assistant