ITSkillsCenter
Blog

Queues asynchrones avec BullMQ et Redis dans NestJS 11

11 دقائق للقراءة

BullMQ est devenu la référence des queues asynchrones dans l’écosystème Node.js. Sa version 5, alignée sur le module officiel @nestjs/bullmq 11.0.4, apporte les FlowProducer pour orchestrer des jobs parents-enfants, le support natif de la priorité et des jobs récurrents, et une intégration profonde avec Redis 8. Ce tutoriel met en place une infrastructure de jobs complète sur NestJS 11 : producer typé, worker dédié, retries exponentiels, jobs récurrents, monitoring Bull Board, et pattern outbox pour la cohérence transactionnelle.

📍 Article principal : NestJS 11 pour startup : architecture production 2026. Cette brique consomme la base de données Prisma et la connexion Redis déjà en place pour le rate-limiter.

Prérequis

  • API NestJS 11 fonctionnelle avec Prisma 7 et Redis
  • Tutoriel Prisma + Postgres en place
  • Redis 7 ou 8 accessible avec persistance AOF activée
  • Compréhension des concepts producer/consumer
  • Temps estimé : 90 minutes

Étape 1 — Installer @nestjs/bullmq et configurer la connexion Redis

L’installation se fait en deux paquets : bullmq en dépendance directe pour les types et le runtime, et @nestjs/bullmq qui apporte les décorateurs et les modules. Le partage du même client ioredis entre throttler et BullMQ économise des connexions, mais BullMQ recommande sa propre connexion dédiée pour éviter les blocages mutuels en cas de pic.

cd apps/api
pnpm add @nestjs/bullmq bullmq ioredis

Le module se configure dans app.module.ts avec BullModule.forRootAsync qui définit la connexion Redis utilisée par défaut pour toutes les queues. Chaque queue spécifique se déclare ensuite via BullModule.registerQueue('emails') dans le module métier qui en a besoin. Cette séparation permet à différents domaines d’avoir leurs propres workers avec des concurrences différentes.

Étape 2 — Déclarer une queue et son producer

Le producer est simplement un service qui injecte la queue typée et expose des méthodes métier. Garder la couche queue derrière un service permet de changer de backend (Kafka, NATS) sans toucher aux appelants. Le typage du payload est critique : un job qui reçoit des données mal typées vivra silencieusement en file d’attente jusqu’à exploser au runtime.

// emails/emails.module.ts
@Module({
  imports: [BullModule.registerQueue({ name: 'emails' })],
  providers: [EmailsService, EmailsProcessor],
  exports: [EmailsService],
})
export class EmailsModule {}

// emails/emails.service.ts
@Injectable()
export class EmailsService {
  constructor(@InjectQueue('emails') private queue: Queue<EmailPayload>) {}

  async sendWelcome(userId: string, email: string) {
    await this.queue.add('welcome', { userId, email }, {
      attempts: 5,
      backoff: { type: 'exponential', delay: 5000 },
      removeOnComplete: 100,
      removeOnFail: 1000,
    });
  }
}

Trois options méritent commentaire. attempts: 5 autorise quatre rejeux après le premier échec. Le backoff exponentiel commence à 5 secondes et double à chaque tentative, ce qui laisse le temps à un service tiers de redevenir disponible. removeOnComplete: 100 conserve les 100 derniers jobs réussis dans Redis pour le debug — au-delà, la mémoire Redis grimpe inutilement. removeOnFail: 1000 garde plus de traces sur les échecs, qui sont les jobs qui demandent investigation.

Étape 3 — Implémenter le worker

Le worker est une classe qui hérite de WorkerHost et qui implémente la méthode process. NestJS reconnaît le décorateur @Processor et instancie automatiquement un Worker BullMQ qui consomme la queue. La gestion d’erreur est implicite : toute exception dans process déclenche le retry selon la politique du job.

// emails/emails.processor.ts
@Processor('emails', { concurrency: 5 })
export class EmailsProcessor extends WorkerHost {
  constructor(private mailer: MailerService) { super(); }

  async process(job: Job<EmailPayload>) {
    if (job.name === 'welcome') {
      return this.mailer.sendTemplate('welcome', job.data);
    }
    throw new Error(`Unknown job name: ${job.name}`);
  }

  @OnWorkerEvent('failed')
  onFailed(job: Job, err: Error) {
    console.error(`Job ${job.id} failed:`, err.message);
  }
}

Le paramètre concurrency: 5 indique que le worker traite cinq jobs simultanément. Cette valeur dépend du travail réel : un envoi d’email I/O-bound supporte 20 ou 50 ; un PDF rendu côté serveur CPU-bound se limite au nombre de cores. Trop de concurrence sature la mémoire ou les connexions sortantes ; trop peu sous-exploite la machine. Mesurer avant d’ajuster.

Étape 4 — Jobs récurrents avec cron

Les tâches périodiques (rapport quotidien, nettoyage des sessions expirées, recalcul de statistiques) s’expriment naturellement dans BullMQ via repeat. Contrairement à un cron système, ces jobs profitent du retry automatique en cas d’échec et survivent aux redémarrages de l’application.

// reports/reports.service.ts
async onModuleInit() {
  await this.queue.add('daily-report', {}, {
    repeat: { pattern: '0 6 * * *', tz: 'Africa/Dakar' },
    removeOnComplete: 7,
  });
}

Le pattern cron '0 6 * * *' déclenche le job tous les jours à 6h. La timezone est explicite : sans tz, BullMQ utilise UTC, ce qui peut décaler l’exécution. Pour éviter la duplication du job à chaque redémarrage, BullMQ utilise un job ID dérivé du pattern et de la timezone, ce qui rend l’ajout idempotent. Une vérification queue.getRepeatableJobs() liste les jobs récurrents enregistrés et permet de purger ceux qui ne sont plus utilisés.

Étape 5 — Workflows avec FlowProducer

Certains processus métier sont des séquences : générer un PDF, l’uploader sur S3, envoyer un email avec le lien. Plutôt que d’enchaîner manuellement les jobs depuis le worker (anti-pattern qui couple les workers), FlowProducer orchestre le flux en déclarant le graphe parent-enfant. BullMQ garantit qu’un parent ne s’exécute qu’après la réussite de tous ses enfants.

// invoices/invoices.service.ts
import { FlowProducer } from 'bullmq';

const flow = new FlowProducer({ connection });
await flow.add({
  name: 'send-invoice',
  queueName: 'emails',
  data: { invoiceId },
  children: [
    { name: 'generate-pdf', queueName: 'pdf', data: { invoiceId } },
    { name: 'upload-s3', queueName: 'storage', data: { invoiceId } },
  ],
});

Le job send-invoice ne s’exécute qu’après generate-pdf et upload-s3. Si l’un échoue après ses retries, le parent reste en attente jusqu’à ce que les enfants finissent. Le payload du parent peut consommer les retours des enfants via job.getChildrenValues(), ce qui permet de transmettre l’URL S3 générée à l’email final. Ce modèle remplace les machines à états fragiles par un graphe explicite.

Étape 6 — Pattern outbox pour la cohérence transactionnelle

Le piège classique : un service écrit en base puis enfile un job ; si l’enfilage échoue, la base est cohérente mais le job manquant cause un bug fonctionnel. Pire, dans une transaction explicite, le job peut être enfilé alors que la transaction PostgreSQL roule en arrière, et un worker traite un état qui n’a jamais existé. Le pattern outbox écrit le message dans une table Outbox dans la même transaction que les autres mutations, et un poller relit cette table pour publier dans BullMQ.

// orders/orders.service.ts
await this.prisma.$transaction([
  this.prisma.order.create({ data: orderData }),
  this.prisma.outbox.create({ data: { topic: 'order.created', payload: orderData } }),
]);

// outbox/outbox.poller.ts (cron toutes les 2 secondes)
@Cron('*/2 * * * * *')
async drain() {
  const messages = await this.prisma.outbox.findMany({
    where: { sentAt: null }, take: 100, orderBy: { id: 'asc' },
  });
  for (const m of messages) {
    await this.queues[m.topic].add(m.topic, m.payload);
    await this.prisma.outbox.update({ where: { id: m.id }, data: { sentAt: new Date() } });
  }
}

Cette approche garantit qu’un job correspond toujours à un état committé. Le coût est minime : une table en plus, un poller qui consomme moins de 1 % de CPU. Pour les charges très élevées, remplacer le poller polling par LISTEN/NOTIFY de PostgreSQL pour réagir instantanément aux insertions. La cohérence est exacte ; l’overhead reste maîtrisé.

Étape 7 — Monitoring avec Bull Board

L’observabilité d’une queue ne se résume pas à des logs. Bull Board est une interface web qui liste les jobs en attente, en cours, échoués, complétés, et permet de relancer manuellement un job échoué. Son installation prend cinq minutes et change radicalement le diagnostic des incidents.

pnpm add @bull-board/express @bull-board/api

// admin/bull-board.module.ts
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
  queues: [new BullMQAdapter(queue)],
  serverAdapter,
});
app.use('/admin/queues', JwtAuthGuard, serverAdapter.getRouter());

L’interface est ensuite disponible sur /admin/queues. Elle doit absolument être protégée par le guard JWT et restreinte aux administrateurs via Casbin — exposer Bull Board publiquement permet à n’importe qui de relancer ou supprimer des jobs critiques. La protection est ce qui distingue un outil utile d’un outil dangereux.

Étape 8 — Tests d’intégration des workers

Tester un worker en isolation revient à instancier le module Nest en mode test, à pousser un job manuellement, et à vérifier que la base reflète bien l’effet. Les tests utilisent une instance Redis dédiée — souvent un Redis en mémoire via ioredis-mock ou un conteneur Docker éphémère via testcontainers — pour ne pas interférer avec le développement.

describe('EmailsProcessor', () => {
  it('processes a welcome email', async () => {
    const job = await queue.add('welcome', { userId: 'u1', email: 'x@x' });
    await waitForJob(job.id);
    expect(mailer.send).toHaveBeenCalledWith({ to: 'x@x', template: 'welcome' });
  });
});

L’utilitaire waitForJob attend que le job ait été traité avant de vérifier l’effet, ce qui rend le test déterministe. Sans cette attente, le test peut passer ou échouer selon la vitesse du worker — flakiness garantie. Cette discipline test prend du temps mais évite des heures de débogage en production.

Étape 9 — Métriques Prometheus et alerting

Une queue silencieuse en production est un risque. Une queue dont la profondeur grimpe sans qu’on s’en aperçoive bloque tout le pipeline asynchrone. Le module bullmq-prometheus expose les métriques essentielles : nombre de jobs en attente, en cours, échoués, durée médiane et p99 de traitement. Ces métriques se branchent sur Prometheus puis Grafana, et déclenchent des alertes sur des seuils explicites.

// metrics/queue-metrics.service.ts
@Injectable()
export class QueueMetricsService {
  @Cron('*/30 * * * * *')
  async collect() {
    for (const q of this.queues) {
      const counts = await q.getJobCounts();
      Object.entries(counts).forEach(([state, n]) =>
        gauge.labels({ queue: q.name, state }).set(n));
    }
  }
}

La règle d’alerte la plus utile : profondeur de la queue waiting > 1000 pendant 5 minutes indique soit un worker mort, soit un afflux anormal. Une seconde alerte sur taux d’échec > 5 % remonte les bugs introduits par un déploiement. Ces deux seuils couvrent 80 % des incidents queue.

Erreurs fréquentes

Erreur Cause Solution
Jobs jamais traités Worker non instancié (Module non importé) Vérifier providers du module
Mémoire Redis qui grimpe removeOnComplete non configuré Limiter à 100 ou un TTL
Doublons de jobs Producer rejoué sans clé d’idempotence jobId stable depuis la donnée
Worker bloqué après crash Stalled jobs non récupérés stalledInterval + log d’incident
Connection lost en boucle maxRetriesPerRequest: null absent Configuration ioredis pour BullMQ

L’option maxRetriesPerRequest: null sur le client ioredis utilisé par BullMQ est documentée mais oubliée par tous ceux qui découvrent l’outil. Sans elle, ioredis abandonne après 20 tentatives de reconnexion et le worker meurt silencieusement. Avec elle, ioredis tente indéfiniment, ce qui est le comportement souhaité pour une queue qui doit survivre à un Redis qui redémarre.

Sécurisation et confidentialité des payloads

Les jobs en file contiennent souvent des données sensibles (emails, montants, IDs internes). Redis n’est pas chiffré au repos par défaut. Pour les charges sensibles, soit chiffrer le payload côté producer avec une clé dérivée par tenant, soit déployer Redis avec TLS et chiffrement disque côté volume Coolify. Cette précaution évite qu’une fuite de la base Redis ne compromette toutes les données en transit.

FAQ

BullMQ ou bullmq-pro ?
BullMQ open-source couvre 95 % des besoins. La version Pro ajoute des observers avancés et un rate-limiting de groupe utile dans des contextes très spécifiques (par exemple, limiter les jobs par tenant dans un SaaS multi-tenant). Pour une startup, démarrer en open-source.

Faut-il une queue par type de job ou une seule queue partagée ?
Une queue par domaine métier (emails, pdf, storage, analytics) reste la meilleure pratique. Cette séparation permet d’allouer une concurrence différente à chaque worker et d’isoler les incidents : un bug dans le worker PDF ne bloque pas les emails.

Comment tester en environnement local sans Redis ?
L’image redis:8-alpine via Docker démarre en deux secondes et consomme 10 Mo de RAM. Il n’y a vraiment aucune raison de chercher un mock — la vraie instance est plus simple et plus fidèle.

Comment migrer d’une queue Bull (legacy) vers BullMQ ?
Bull et BullMQ partagent les concepts mais pas les structures Redis. La migration se fait par drainage : arrêter de produire dans Bull, laisser les workers Bull terminer leurs jobs, basculer le producer sur BullMQ, déployer un worker BullMQ. La double exécution est minimisée à quelques minutes.

Tutoriels associés

Références

Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité