Développement Web

Queues asynchrones avec BullMQ 5.76 et Redis 8 : tutoriel pas à pas

12 min de lecture

📌 Article principal de la série : Redis 8 : caching, queues, pub/sub et streams pour applications production
Prérequis : Installer Redis 8 sur Linux

BullMQ est devenu en quelques années le framework de queue de référence pour Node.js : robuste, performant, riche en fonctionnalités, et activement maintenu. Construit au-dessus de Redis, il permet de traiter des dizaines de millions de jobs par jour sur une infrastructure modeste. La version 5.76, publiée en mai 2026, intègre OpenTelemetry pour le tracing distribué, les flow producers pour modéliser des DAG de jobs interdépendants, le rate limiting fin et les dead letter queues. Ce tutoriel construit pas à pas un pipeline complet : producteur, worker, retry, jobs delayed, priorités et observabilité.

Prérequis

  • Redis 8 opérationnel (tutoriel installation)
  • Node.js 22 LTS
  • Maîtrise async/await
  • Temps estimé : 75 minutes

Étape 1 — Initialiser le projet et installer BullMQ

Démarrons par un projet Node.js standard avec BullMQ comme seule dépendance directe. ioredis est ramené automatiquement comme transitive — c’est le client Redis utilisé sous le capot.

mkdir bullmq-demo
cd bullmq-demo
npm init -y
npm install bullmq

# Verifier la version installee
node -e "console.log(require('bullmq/package.json').version)"

Vous devez obtenir 5.76.8 ou une version supérieure de la série 5.76. Les versions 5.x sont la branche actuelle stable depuis 2024. La version majeure 6 est en bêta avec des changements de breaking sur les types TypeScript ; pour la production, restez sur 5.7x. Configurez aussi type: "module" dans package.json pour utiliser la syntaxe ES Modules moderne plutôt que CommonJS.

Étape 2 — Créer un producteur de jobs

Dans le pattern producteur/consommateur, le producteur est typiquement votre serveur HTTP : il reçoit une requête utilisateur, identifie qu’un traitement asynchrone est nécessaire, pousse un job dans une queue, et retourne immédiatement une réponse à l’utilisateur. Le travail réel sera exécuté par un worker en arrière-plan.

// producteur.js
import { Queue } from 'bullmq';

const connection = {
    host: '127.0.0.1',
    port: 6379,
    username: 'appuser',
    password: process.env.REDIS_PASSWORD
};

const emailQueue = new Queue('envoi-emails', { connection });

async function ajouterEmail(destinataire, sujet, corps) {
    const job = await emailQueue.add('email-transactionnel', {
        destinataire,
        sujet,
        corps,
        envoyeA: new Date().toISOString()
    }, {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 }
    });
    console.log(`Job ${job.id} ajoute dans la queue`);
    return job.id;
}

// Simulation d'usage depuis un endpoint HTTP
await ajouterEmail('client@example.com', 'Confirmation commande', 'Merci pour votre achat...');
process.exit(0);

La queue est identifiée par son nom envoi-emails — c’est ce nom qui sera partagé entre producteur et worker. Le payload du job (deuxième argument de add()) est un objet JavaScript sérialisé automatiquement en JSON. Les options attempts: 3 et backoff: { type: 'exponential', delay: 2000 } configurent le retry : en cas d’échec, BullMQ retentera 3 fois (2 retries après le premier essai) avec un délai exponentiel — 2 s, 4 s, 8 s — entre chaque tentative. Cette stratégie est cruciale pour absorber les pannes transitoires (API externe indisponible, base lente, etc.).

Étape 3 — Créer un worker qui consomme les jobs

Le worker est un processus indépendant qui se connecte à la même queue et exécute le code métier. On peut lancer N workers en parallèle — BullMQ assure que chaque job n’est traité que par un seul worker grâce à des verrous Redis.

// worker.js
import { Worker } from 'bullmq';

const connection = { host: '127.0.0.1', port: 6379, username: 'appuser', password: process.env.REDIS_PASSWORD };

const worker = new Worker('envoi-emails', async (job) => {
    console.log(`Traitement job ${job.id} : email a ${job.data.destinataire}`);

    // Simulation envoi via SMTP / API
    await new Promise(r => setTimeout(r, 500));

    // Simuler une erreur intermittente (10% du temps)
    if (Math.random() < 0.1) {
        throw new Error('Service SMTP temporairement indisponible');
    }

    return { envoye: true, ts: Date.now() };
}, {
    connection,
    concurrency: 5
});

worker.on('completed', (job, result) => {
    console.log(`Job ${job.id} OK :`, result);
});

worker.on('failed', (job, err) => {
    console.error(`Job ${job?.id} echec (essai ${job?.attemptsMade}/${job?.opts.attempts}) :`, err.message);
});

L’option concurrency: 5 permet au worker de traiter jusqu’à 5 jobs en parallèle — utile pour les jobs I/O-bound (appels API, requêtes base). Pour les jobs CPU-bound, lancez plutôt plusieurs processus avec concurrency: 1 chacun, via PM2 ou un orchestrateur. L’événement completed est émis quand un job termine sans erreur ; failed quand toutes les tentatives ont échoué. BullMQ persiste les jobs terminés (succès ou échec) dans Redis pendant 7 jours par défaut, ce qui permet le debug a posteriori.

Étape 4 — Tester le pipeline complet

Lancez le worker dans un terminal et le producteur dans un autre. Vous devez voir les jobs apparaître côté worker au fur et à mesure de leur création côté producteur.

# Terminal 1 : lancer le worker
node worker.js

# Terminal 2 : injecter des jobs
node producteur.js
# Repeter ou injecter plusieurs jobs dans une boucle
for i in {1..20}; do node producteur.js; done

Le worker affichera la séquence : « Traitement job 1… Job 1 OK ». Environ 10 % des jobs vont échouer la première fois (simulation aléatoire), seront retryés, et finiront par réussir ou être marqués comme échec permanent. Inspectez l’état de la queue avec redis-cli :

redis-cli --user appuser -a "$REDIS_PASSWORD" KEYS "bull:envoi-emails:*" | head
redis-cli --user appuser -a "$REDIS_PASSWORD" LLEN "bull:envoi-emails:wait"
redis-cli --user appuser -a "$REDIS_PASSWORD" ZCARD "bull:envoi-emails:completed"
redis-cli --user appuser -a "$REDIS_PASSWORD" ZCARD "bull:envoi-emails:failed"

BullMQ stocke les jobs dans plusieurs structures Redis : wait (liste FIFO des jobs en attente), active (jobs en cours de traitement), completed (sorted set par timestamp), failed (idem), delayed (sorted set par échéance pour les jobs différés). Comprendre ce mapping aide à débugger efficacement en production.

Étape 5 — Jobs différés et récurrents

BullMQ supporte les jobs avec exécution différée (par exemple, envoyer un rappel dans 24 h) et les jobs récurrents façon cron (par exemple, exécuter une tâche de nettoyage chaque nuit).

// Job differe : execution dans 5 minutes
await emailQueue.add('rappel', { user: 'aissa@example.com' }, {
    delay: 5 * 60 * 1000  // millisecondes
});

// Job recurrent (API moderne — recommandee depuis BullMQ 5.16) :
await emailQueue.upsertJobScheduler(
    'rapport-quotidien',                 // identifiant du scheduler (idempotent)
    { pattern: '0 3 * * *' },            // cron : tous les jours a 03h00 UTC
    { name: 'rapport-journalier', data: {}, opts: {} }
);

// Lister les schedulers actifs
const schedulers = await emailQueue.getJobSchedulers();
console.log(schedulers);

Le delay en millisecondes pousse le job dans la liste delayed de Redis ; un mécanisme interne déplace le job vers wait au bon moment. Depuis BullMQ 5.16, l’ancienne API add(name, data, { repeat: ... }) est dépréciée au profit de upsertJobScheduler qui agit comme une factory de jobs : nommé par un identifiant stable (ici rapport-quotidien), il génère automatiquement les jobs selon le pattern et reste idempotent au redéploiement — appeler upsertJobScheduler avec le même ID met à jour le scheduler existant au lieu d’en créer un doublon. Pour supprimer un scheduler, utilisez removeJobScheduler(id).

Étape 6 — Priorités et rate limiting

Pour gérer plusieurs niveaux d’urgence (par exemple, traiter d’abord les emails transactionnels critiques avant les newsletters), BullMQ supporte les priorités numériques : plus la valeur est basse, plus le job est prioritaire.

// Job urgent
await emailQueue.add('reinitialisation-mdp', payload, { priority: 1 });

// Job normal
await emailQueue.add('confirmation', payload, { priority: 10 });

// Job de masse (newsletter)
await emailQueue.add('newsletter', payload, { priority: 100 });

Le rate limiting limite le nombre de jobs traités par unité de temps. C’est crucial pour respecter les quotas des API externes (Mailgun limite à 100 emails/seconde, par exemple) ou pour éviter de saturer une base de données aval.

const worker = new Worker('envoi-emails', processFn, {
    connection,
    limiter: { max: 100, duration: 1000 }  // max 100 jobs par seconde
});

Si le worker atteint la limite, les jobs supplémentaires attendent dans la queue plutôt que de saturer le service aval. Cette protection est indispensable en production — un pic de trafic mal géré peut causer un ban temporaire de votre IP par le fournisseur SMTP.

Étape 7 — Dead Letter Queue

Quand un job échoue toutes ses tentatives, il termine en état failed. En production, il faut une stratégie pour ces jobs morts : alerter une équipe, déplacer vers une queue d’inspection humaine, ou réinjecter manuellement après correction du bug. Le pattern Dead Letter Queue (DLQ) automatise le déplacement.

import { Queue, Worker } from 'bullmq';

const dlq = new Queue('dlq-emails', { connection });

const worker = new Worker('envoi-emails', async (job) => {
    // logique normale
}, { connection });

worker.on('failed', async (job, err) => {
    if (job.attemptsMade >= (job.opts.attempts || 1)) {
        // Toutes tentatives epuisees : envoyer en DLQ
        await dlq.add('email-mort', {
            originalJobId: job.id,
            originalData: job.data,
            erreur: err.message,
            stack: err.stack,
            timestamp: Date.now()
        });
        console.warn(`Job ${job.id} envoye en DLQ`);
    }
});

Un second worker (ou un humain via une interface admin) consomme la DLQ pour inspecter chaque échec. Cela évite la perte silencieuse d’évènements métiers — par exemple, un email de confirmation de commande qui ne part jamais alors que le client a payé.

Étape 8 — Observabilité avec OpenTelemetry

BullMQ 5.71+ intègre nativement OpenTelemetry. En configurant le SDK OpenTelemetry de Node.js, chaque job devient une span qui peut être visualisée dans Jaeger, Tempo, Honeycomb ou n’importe quel backend OTLP. Cela permet de tracer le parcours complet d’une requête utilisateur : HTTP request → producteur ajoute job → job differe X secondes → worker execute → SMTP envoie → resultat.

npm install bullmq-otel @opentelemetry/sdk-node @opentelemetry/auto-instrumentations-node @opentelemetry/exporter-trace-otlp-http

Le package bullmq-otel fournit l’instrumentation officielle BullMQ ; les autres packages OpenTelemetry exportent les traces (HTTP, gRPC) vers le backend (Jaeger, Tempo, Honeycomb, Datadog). Configurez d’abord le SDK OpenTelemetry de Node.js :

// telemetry.js
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';

const sdk = new NodeSDK({
    traceExporter: new OTLPTraceExporter({ url: 'http://jaeger:4318/v1/traces' }),
    instrumentations: [getNodeAutoInstrumentations()]
});
sdk.start();

Puis attachez l’instrumentation BullMQ directement sur chaque Queue et Worker via l’option telemetry :

// producteur.js et worker.js
import { Queue, Worker } from 'bullmq';
import { BullMQOtel } from 'bullmq-otel';

const connection = { host: '127.0.0.1', port: 6379, username: 'appuser', password: process.env.REDIS_PASSWORD };

const emailQueue = new Queue('envoi-emails', {
    connection,
    telemetry: new BullMQOtel('envoi-emails-service')
});

const worker = new Worker('envoi-emails', processFn, {
    connection,
    telemetry: new BullMQOtel('envoi-emails-service')
});

Importez telemetry.js en première ligne de producteur.js et worker.js (avant les autres imports), puis instanciez BullMQOtel sur chaque Queue et Worker. Les spans BullMQ apparaissent dans Jaeger UI avec les attributs standardisés queue.name, job.id, job.attempts, job.delay, et la durée totale du job. Pour les déploiements modestes, Coolify peut héberger Jaeger en une commande.

Étape 9 — Monitoring avec Bull Board

Bull Board est une interface web officielle pour inspecter les queues BullMQ. Elle affiche en temps réel : jobs en attente, actifs, complétés, échoués, delayed. Indispensable en debug.

npm install @bull-board/api @bull-board/express express
// bull-board.js
import express from 'express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { Queue } from 'bullmq';

const app = express();
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
    queues: [new BullMQAdapter(new Queue('envoi-emails', { connection }))],
    serverAdapter
});

app.use('/admin/queues', serverAdapter.getRouter());
app.listen(3000, () => console.log('Bull Board sur http://localhost:3000/admin/queues'));

L’interface est accessible sur http://localhost:3000/admin/queues. Pour la production, protégez-la derrière un middleware d’authentification (Basic auth, JWT, ou auth via un reverse proxy). Bull Board ne doit jamais être exposé publiquement sans protection — il permet de relancer ou supprimer des jobs.

Erreurs fréquentes

Erreur Cause Solution
Jobs traités plusieurs fois Timeout du verrou trop court ou worker qui plante Augmenter lockDuration dans options worker, ou rendre le job idempotent
Memory leak Redis sur queue très chargée Jobs completed/failed jamais nettoyés Configurer removeOnComplete: { count: 1000 } et removeOnFail: { count: 5000 }
Worker bloque indéfiniment sur job Pas de timeout sur les appels externes Utiliser AbortController avec timeout, lever erreur explicite
Jobs recurrents en doublon après redeploiement Anciens schedulers non nettoyés Au boot : queue.removeRepeatableByKey() pour tous les anciens patterns
Connexion Redis perdue → workers stoppent Pas de gestion d’événements de reconnexion Écouter worker.on('error'), configurer retryStrategy robuste sur ioredis

Tutoriels suivants

FAQ

BullMQ ou RabbitMQ pour les queues ?
BullMQ si vous êtes déjà sur l’écosystème Redis et que vos jobs sont Node.js / Python / Bun. RabbitMQ pour les architectures multi-langages avec besoins avancés (exchanges, routing complexe, dead letter exchanges natifs, multi-tenancy). BullMQ est plus simple, RabbitMQ plus flexible.
Combien de jobs par seconde BullMQ peut-il traiter ?
Sur un Redis monoserveur avec 5 workers concurrents, on traite couramment 5 000 à 10 000 jobs/seconde de jobs courts (qq ms de traitement). Pour des jobs lourds (1+ seconde), la limite est la concurrence × nombre de workers, pas Redis. Scaler horizontalement signifie ajouter des processus worker.
Que faire si Redis tombe ?
Les jobs en cours sont reprogrammés automatiquement quand Redis revient (grâce aux verrous distribués). Les jobs déjà en wait/delayed sont préservés grâce à la persistance AOF. Les producteurs nouvellement ajoutés pendant le downtime échouent — il faut traiter cette erreur côté serveur HTTP (retry court ou réponse 503).
BullMQ supporte-t-il les transactions ?
Pas directement. Pour grouper plusieurs jobs atomiquement, utilisez les flows (DAG) qui permettent de définir des dépendances parent-enfant : le parent ne s’exécute qu’après le succès de tous ses enfants. C’est l’équivalent BullMQ d’une transaction métier.

Références

Service ITSkillsCenter

Site ou application web sur mesure

Conception Pro + Nom de domaine 1 an + Hébergement 1 an + Formation + Support 6 mois. Accès et code livrés. À partir de 350 000 FCFA.

Demander un devis
Publicité