تطوير الويب

Queues غير متزامنة مع BullMQ وRedis في NestJS 11

4 دقائق للقراءة

السلسلة: هذا الدرس جزء من سلسلة NestJS 11. اقرأ المقال الرئيسي.

BullMQ صار مرجع queues غير المتزامنة في نظام Node.js. الإصدار 5، المتوافق مع module الرسمي @nestjs/bullmq 11.0.4، يجلب FlowProducer لتنسيق jobs parent-enfants، الدعم الأصلي للأولوية وjobs دورية، وتكاملاً عميقاً مع Redis 8.

المتطلبات

  • API NestJS 11 مع Prisma 7 وRedis
  • Redis 7 أو 8 مع persistance AOF مفعَّلة
  • 90 دقيقة

الخطوة 1 — تثبيت @nestjs/bullmq وضبط Redis

cd apps/api
pnpm add @nestjs/bullmq bullmq ioredis

module يُضبط في app.module.ts بـ BullModule.forRootAsync. كل queue محدّدة تُعلَن عبر BullModule.registerQueue('emails') في module الأعمال.

الخطوة 2 — إعلان queue والـ producer

// emails/emails.module.ts
@Module({
  imports: [BullModule.registerQueue({ name: 'emails' })],
  providers: [EmailsService, EmailsProcessor],
  exports: [EmailsService],
})
export class EmailsModule {}

// emails/emails.service.ts
@Injectable()
export class EmailsService {
  constructor(@InjectQueue('emails') private queue: Queue<EmailPayload>) {}

  async sendWelcome(userId: string, email: string) {
    await this.queue.add('welcome', { userId, email }, {
      attempts: 5,
      backoff: { type: 'exponential', delay: 5000 },
      removeOnComplete: 100,
      removeOnFail: 1000,
    });
  }
}

attempts: 5 يأذن بأربع إعادات بعد الفشل الأول. backoff أسي يبدأ بـ 5 ثوان ويتضاعف. removeOnComplete: 100 يحفظ آخر 100 job ناجح للـ debug.

الخطوة 3 — تنفيذ worker

// emails/emails.processor.ts
@Processor('emails', { concurrency: 5 })
export class EmailsProcessor extends WorkerHost {
  constructor(private mailer: MailerService) { super(); }

  async process(job: Job<EmailPayload>) {
    if (job.name === 'welcome') {
      return this.mailer.sendTemplate('welcome', job.data);
    }
    throw new Error('Unknown job name: ' + job.name);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job, err: Error) {
    console.error('Job ' + job.id + ' failed:', err.message);
  }
}

concurrency 5 يدلّ على معالجة 5 jobs متزامنة. إرسال بريد I/O-bound يحتمل 20-50؛ PDF CPU-bound يحدّ بعدد cores.

الخطوة 4 — Jobs دورية بـ cron

// reports/reports.service.ts
async onModuleInit() {
  await this.queue.add('daily-report', {}, {
    repeat: { pattern: '0 6 * * *', tz: 'Africa/Casablanca' },
    removeOnComplete: 7,
  });
}

cron '0 6 * * *' يطلق job كل يوم في 6 صباحاً. timezone صريح: بلا tz، BullMQ يستخدم UTC.

الخطوة 5 — Workflows مع FlowProducer

// invoices/invoices.service.ts
import { FlowProducer } from 'bullmq';

const flow = new FlowProducer({ connection });
await flow.add({
  name: 'send-invoice',
  queueName: 'emails',
  data: { invoiceId },
  children: [
    { name: 'generate-pdf', queueName: 'pdf',     data: { invoiceId } },
    { name: 'upload-s3',    queueName: 'storage', data: { invoiceId } },
  ],
});

job send-invoice لا يُنفَّذ إلا بعد generate-pdf وupload-s3. payload الأب يستهلك إرجاعات الأبناء عبر job.getChildrenValues().

الخطوة 6 — نمط outbox للتماسك transactionnel

الفخّ الكلاسيكي: خدمة تكتب في القاعدة ثم تدفع job؛ إن أخفقت إنفاذ، القاعدة متماسكة لكن job مفقود. أسوأ، job قد يُنفَّذ بينما transaction PostgreSQL يلفّ خلفاً. نمط outbox يكتب الرسالة في جدول Outbox في نفس transaction، وpoller يقرأ هذا الجدول للنشر في BullMQ.

// orders/orders.service.ts
await this.prisma.$transaction([
  this.prisma.order.create({ data: orderData }),
  this.prisma.outbox.create({ data: { topic: 'order.created', payload: orderData } }),
]);

// outbox/outbox.poller.ts (cron كل ثانيتين)
@Cron('*/2 * * * * *')
async drain() {
  const messages = await this.prisma.outbox.findMany({
    where: { sentAt: null }, take: 100, orderBy: { id: 'asc' },
  });
  for (const m of messages) {
    await this.queues[m.topic].add(m.topic, m.payload);
    await this.prisma.outbox.update({ where: { id: m.id }, data: { sentAt: new Date() } });
  }
}

الخطوة 7 — الرصد مع Bull Board

pnpm add @bull-board/express @bull-board/api

// admin/bull-board.module.ts
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
  queues: [new BullMQAdapter(queue)],
  serverAdapter,
});
app.use('/admin/queues', JwtAuthGuard, serverAdapter.getRouter());

الواجهة تصير متاحة على /admin/queues. يجب حمايتها مطلقاً بـ JWT guard وتقييدها للمدراء عبر Casbin — كشف Bull Board عاماً يسمح لأي شخص بإعادة تشغيل أو حذف jobs حرجة.

الخطوة 8 — اختبارات تكامل للـ workers

describe('EmailsProcessor', () => {
  it('processes a welcome email', async () => {
    const job = await queue.add('welcome', { userId: 'u1', email: 'x@x' });
    await waitForJob(job.id);
    expect(mailer.send).toHaveBeenCalledWith({ to: 'x@x', template: 'welcome' });
  });
});

الخطوة 9 — مقاييس Prometheus وتنبيهات

// metrics/queue-metrics.service.ts
@Injectable()
export class QueueMetricsService {
  @Cron('*/30 * * * * *')
  async collect() {
    for (const q of this.queues) {
      const counts = await q.getJobCounts();
      Object.entries(counts).forEach(([state, n]) =>
        gauge.labels({ queue: q.name, state }).set(n));
    }
  }
}

قاعدة التنبيه الأنفع: عمق queue waiting > 1000 لمدة 5 دقائق يدلّ إما على worker ميت أو تدفق شاذ. تنبيه ثانٍ على معدل فشل > 5%.

أخطاء شائعة

الخطأ السبب الحل
Jobs لا تُعالَج أبداً Worker غير مُنشَأ تحقّق providers module
ذاكرة Redis تصعد removeOnComplete غير مُعَدّ حدّ بـ 100 أو TTL
تكرار jobs Producer مُعاد بلا مفتاح idempotence jobId ثابت من البيانات
Worker محجوب بعد crash Stalled jobs غير مستردَّة stalledInterval + log حادث
Connection lost في حلقة maxRetriesPerRequest: null غائب اضبط ioredis لـ BullMQ

الخيار maxRetriesPerRequest: null على client ioredis المستخدَم من BullMQ موثَّق لكن منسي. بدونه، ioredis يتخلى بعد 20 محاولة إعادة اتصال والـ worker يموت صامتاً.

أسئلة شائعة

BullMQ أم bullmq-pro؟ BullMQ open-source يغطي 95% من الحاجات. Pro يضيف observers متقدمة وrate-limiting بمجموعة.

queue لكل نوع job أم queue مشتركة؟ queue لكل مجال أعمال (emails، pdf، storage، analytics) أفضل ممارسة. تسمح بتخصيص concurrency مختلفة وعزل الحوادث.

كيف نختبر محلياً بلا Redis؟ صورة redis:8-alpine عبر Docker تنطلق في ثانيتين وتستهلك 10 ميغا RAM. لا داعي للبحث عن mock.

كيف نُهاجر من Bull (قديم) إلى BullMQ؟ Bull وBullMQ يتشاركان المفاهيم لا بنى Redis. الهجرة عبر تصريف: أوقف الإنتاج في Bull، دع workers Bull تنهي jobsها، بدّل producer إلى BullMQ، انشر worker BullMQ.

مقالات ذات صلة

Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité