تطوير الويب

طوابير غير متزامنة بـ BullMQ 5.76 وRedis 8: دليل خطوة بخطوة

5 min de lecture

📌 المقال الرئيسي: Redis 8: caching، queues، pub/sub، streams
المتطلّبات: تثبيت Redis 8

BullMQ صار في سنوات قليلة framework queue المرجعي لـ Node.js: متين، عالي الأداء، غنيّ بالميزات، ومَصون نشطًا. مبنيًّا فوق Redis، يُتيح معالجة عشرات الملايين من jobs يوميًّا على بنية تحتية متواضعة. النسخة 5.76، المنشورة في مايو 2026، تُدمج OpenTelemetry للـ tracing الموزَّع، flow producers لنمذجة DAG من jobs مترابطة، rate limiting دقيق، وdead letter queues. يبني هذا الدليل خطوة بخطوة pipeline كامل: producer، worker، retry، jobs delayed، أولويات، ومراقبة.

المتطلّبات

  • Redis 8 يعمل.
  • Node.js 22 LTS.
  • إتقان async/await.
  • الوقت: 75 دقيقة.

الخطوة 1 — تهيئة المشروع وتثبيت BullMQ

mkdir bullmq-demo
cd bullmq-demo
npm init -y
npm install bullmq

# تحقّق من النسخة المثبَّتة
node -e "console.log(require('bullmq/package.json').version)"

يجب الحصول على 5.76.8 أو أحدث. النسخ 5.x هي الفرع المستقرّ الحالي. النسخة 6 في beta. هَيِّئ type: "module" في package.json لاستعمال ES Modules.

الخطوة 2 — إنشاء producer للـ jobs

// producteur.js
import { Queue } from 'bullmq';

const connection = {
  host: '127.0.0.1',
  port: 6379,
  username: 'appuser',
  password: process.env.REDIS_PASSWORD
};

const emailQueue = new Queue('envoi-emails', { connection });

async function ajouterEmail(destinataire, sujet, corps) {
  const job = await emailQueue.add('email-transactionnel', {
    destinataire,
    sujet,
    corps,
    envoyeA: new Date().toISOString()
  }, {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 }
  });
  console.log('Job ' + job.id + ' ajouté dans la queue');
  return job.id;
}

await ajouterEmail('client@example.com', 'Confirmation commande', 'Merci pour votre achat...');
process.exit(0);

الـ queue مُحَدَّدة باسمها envoi-emails — هذا الاسم يُتقاسَم بين producer وworker. payload الـ job سيُسَلسَل تلقائيًّا إلى JSON. خيارات attempts: 3 وbackoff: { type: 'exponential', delay: 2000 } تُهَيِّئ retry: عند الإخفاق، BullMQ يُعيد المحاولة 3 مرّات بتأخير أسّي (2 ث، 4 ث، 8 ث).

الخطوة 3 — إنشاء worker يستهلك jobs

// worker.js
import { Worker } from 'bullmq';

const connection = { host: '127.0.0.1', port: 6379, username: 'appuser', password: process.env.REDIS_PASSWORD };

const worker = new Worker('envoi-emails', async (job) => {
  console.log('Traitement job ' + job.id + ' : email à ' + job.data.destinataire);

  // محاكاة إرسال عبر SMTP / API
  await new Promise(r => setTimeout(r, 500));

  // محاكاة خطأ متقطّع (10% من الوقت)
  if (Math.random() < 0.1) {
    throw new Error('Service SMTP temporairement indisponible');
  }

  return { envoye: true, ts: Date.now() };
}, {
  connection,
  concurrency: 5
});

worker.on('completed', (job, result) => {
  console.log('Job ' + job.id + ' OK :', result);
});

worker.on('failed', (job, err) => {
  console.error('Job ' + job?.id + ' echec (essai ' + job?.attemptsMade + '/' + job?.opts.attempts + ') :', err.message);
});

concurrency: 5 يُتيح للـ worker معالجة 5 jobs بالتوازي — مفيد للـ jobs I/O-bound. للـ jobs CPU-bound، شَغِّل عدّة عمليات بـ concurrency: 1 لكلّ منها عبر PM2. BullMQ يُداوم jobs المُنتَهية في Redis لـ 7 أيّام افتراضيًّا.

الخطوة 4 — اختبار pipeline كامل

# Terminal 1: شَغِّل worker
node worker.js

# Terminal 2: حقن jobs
node producteur.js
for i in {1..20}; do node producteur.js; done

الـ worker سيعرض السلسلة: « Traitement job 1… Job 1 OK ». نحو 10% من jobs سيُخفقون أوّل مرّة، يُعاد محاولتهم، وينتهون بنجاح أو إخفاق دائم. افحص حالة queue بـ redis-cli:

redis-cli --user appuser -a "$REDIS_PASSWORD" KEYS "bull:envoi-emails:*" | head
redis-cli --user appuser -a "$REDIS_PASSWORD" LLEN  "bull:envoi-emails:wait"
redis-cli --user appuser -a "$REDIS_PASSWORD" ZCARD "bull:envoi-emails:completed"
redis-cli --user appuser -a "$REDIS_PASSWORD" ZCARD "bull:envoi-emails:failed"

BullMQ يُخَزِّن jobs في عدّة بنى Redis: wait (FIFO list)، active (قيد المعالجة)، completed (sorted set)، failed، delayed.

الخطوة 5 — Jobs مُؤَجَّلة ومتكرّرة

// Job مُؤَجَّل: تنفيذ بعد 5 دقائق
await emailQueue.add('rappel', { user: 'salim@example.com' }, {
  delay: 5 * 60 * 1000 // ميلي ثانية
});

// Job متكرّر (API حديثة — موصى بها منذ BullMQ 5.16):
await emailQueue.upsertJobScheduler(
  'rapport-quotidien',                  // معرّف الـ scheduler (idempotent)
  { pattern: '0 3 * * *' },             // cron: كلّ يوم على الساعة 03:00 UTC
  { name: 'rapport-journalier', data: {}, opts: {} }
);

// إدراج الـ schedulers النشطة
const schedulers = await emailQueue.getJobSchedulers();
console.log(schedulers);

الـ delay يدفع الـ job في قائمة delayed في Redis. منذ BullMQ 5.16، الواجهة القديمة add(name, data, { repeat: ... }) مُستبعَدة لصالح upsertJobScheduler الذي يعمل كـ factory للـ jobs.

الخطوة 6 — أولويات وrate limiting

// Job عاجل
await emailQueue.add('reinitialisation-mdp', payload, { priority: 1 });

// Job عادي
await emailQueue.add('confirmation', payload, { priority: 10 });

// Job ضخم (newsletter)
await emailQueue.add('newsletter', payload, { priority: 100 });
const worker = new Worker('envoi-emails', processFn, {
  connection,
  limiter: { max: 100, duration: 1000 } // 100 jobs/ثانية كحدّ أقصى
});

قيمة أقلّ = أولوية أعلى. rate limiter يحدّ عدد jobs المُعالَجة لكلّ وحدة زمن. حيوي لاحترام حصص APIs خارجية (Mailgun يحدّ بـ 100 emails/ثانية).

الخطوة 7 — Dead Letter Queue

import { Queue, Worker } from 'bullmq';

const dlq = new Queue('dlq-emails', { connection });

const worker = new Worker('envoi-emails', async (job) => {
  // المنطق العادي
}, { connection });

worker.on('failed', async (job, err) => {
  if (job.attemptsMade >= (job.opts.attempts || 1)) {
    // كلّ المحاولات اُستُنفِدت: أرسل إلى DLQ
    await dlq.add('email-mort', {
      originalJobId: job.id,
      originalData:  job.data,
      erreur: err.message,
      stack:  err.stack,
      timestamp: Date.now()
    });
    console.warn('Job ' + job.id + ' envoyé en DLQ');
  }
});

worker ثانٍ (أو بشر عبر واجهة admin) يستهلك DLQ لفحص كلّ إخفاق. يتفادى فقدانًا صامتًا لأحداث عمل.

الخطوة 8 — مراقبة بـ OpenTelemetry

BullMQ 5.71+ يُدمج OpenTelemetry أصليًّا. كلّ job يصير span قابلًا للعرض في Jaeger أو Tempo أو Honeycomb.

npm install bullmq-otel @opentelemetry/sdk-node @opentelemetry/auto-instrumentations-node @opentelemetry/exporter-trace-otlp-http
// telemetry.js
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';

const sdk = new NodeSDK({
  traceExporter:    new OTLPTraceExporter({ url: 'http://jaeger:4318/v1/traces' }),
  instrumentations: [getNodeAutoInstrumentations()]
});
sdk.start();
// producteur.js وworker.js
import { Queue, Worker } from 'bullmq';
import { BullMQOtel } from 'bullmq-otel';

const connection = { host: '127.0.0.1', port: 6379, username: 'appuser', password: process.env.REDIS_PASSWORD };

const emailQueue = new Queue('envoi-emails', {
  connection,
  telemetry: new BullMQOtel('envoi-emails-service')
});

const worker = new Worker('envoi-emails', processFn, {
  connection,
  telemetry: new BullMQOtel('envoi-emails-service')
});

spans BullMQ تظهر في Jaeger UI بـ queue.name، job.id، job.attempts، والمدّة الكلّية.

الخطوة 9 — مراقبة بـ Bull Board

Bull Board واجهة ويب رسمية لفحص queues BullMQ. تعرض زمنيًّا: jobs في الانتظار، نشطة، مُنتَهية، مُخفِقة، delayed.

npm install @bull-board/api @bull-board/express express
// bull-board.js
import express from 'express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { Queue } from 'bullmq';

const app = express();
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [new BullMQAdapter(new Queue('envoi-emails', { connection }))],
  serverAdapter
});

app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3000, () => console.log('Bull Board sur http://localhost:3000/admin/queues'));

الواجهة على http://localhost:3000/admin/queues. للإنتاج، احمها خلف middleware توثيق. لا تُعَرِّضها عامّيًّا دون حماية.

أخطاء شائعة

الخطأ السبب الحلّ
jobs تُعالَج عدّة مرّات timeout قفل قصير أو worker ينهار زد lockDuration، أو اجعل الـ job idempotent
memory leak Redis على queue مُحَمَّلة jobs completed/failed لم تُنَظَّف هَيِّئ removeOnComplete: { count: 1000 } وremoveOnFail: { count: 5000 }
worker محجوب على job لا timeout على نداءات خارجية استعمل AbortController مع timeout
schedulers مكرّرة بعد نشر schedulers قديمة لم تُنَظَّف عند الإقلاع: queue.removeRepeatableByKey()
اتّصال Redis مفقود ← workers يتوقّفون لا إدارة لأحداث إعادة الاتّصال استمع worker.on('error')، هَيِّئ retryStrategy

الأدلّة التالية

🔝 العودة للدليل الرئيسي

FAQ

BullMQ أم RabbitMQ؟ BullMQ إن كنت على منظومة Redis وjobs بـ Node.js/Python/Bun. RabbitMQ لمعماريات متعدّدة اللغات بحاجات متقدّمة (exchanges، routing، multi-tenancy).

كم job/ثانية يمكن لـ BullMQ معالجته؟ على Redis monoserveur بـ 5 workers، 5,000 إلى 10,000 jobs/ثانية من jobs قصيرة. لـ jobs ثقيلة، الحدّ هو concurrency × عدد workers.

ماذا إن سقط Redis؟ jobs النشطة تُعاد جدولتها تلقائيًّا. jobs في wait/delayed محفوظة بـ AOF. producers خلال downtime يُخفقون.

BullMQ يدعم المعاملات؟ ليس مباشرة. لتجميع jobs ذرّيًّا، استعمل flows (DAG): الأبّ لا يُنَفَّذ إلّا بعد نجاح كلّ أبنائه.

مراجع

  • توثيق BullMQ الرسمي
  • BullMQ على GitHub
  • Bull Board — واجهة admin
  • OpenTelemetry JavaScript

مقالات ذات صلة

Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité