Développement Web

Redis Streams : event log et consumer groups pas à pas

10 min de lecture

📌 Article principal de la série : Redis 8 : caching, queues, pub/sub et streams pour applications production

Les Streams Redis, introduits en version 5.0, transforment Redis en un véritable broker d’événements persistant — alternative simple et performante à Apache Kafka pour les volumes en dessous du téraoctet. Contrairement au pub/sub, chaque événement est stocké durablement, indexé par un identifiant chronologique unique, et peut être consommé par plusieurs consumer groups indépendants avec garantie de livraison. Ce tutoriel construit pas à pas un pipeline event-driven avec consumer groups, gestion des pending messages et stratégie de réessai.

Prérequis

  • Redis 8 opérationnel
  • Node.js 22 LTS, ou tout autre client (Python, Go, Java)
  • Lecture des concepts pub/sub utiles pour comparaison
  • Temps estimé : 80 minutes

Étape 1 — Créer un stream et ajouter des événements

Un stream Redis est créé implicitement à la première écriture. La commande XADD ajoute un événement avec un ID auto-généré (timestamp millisecondes + séquence) et une payload sous forme de champs clé-valeur.

redis-cli --user appuser -a "$REDIS_PASSWORD"

# Ajouter des evenements au stream 'commandes'
> XADD commandes * id 1001 userId aissa total 50000 produit "laptop"
"1715985432000-0"
> XADD commandes * id 1002 userId fatou total 8500 produit "souris"
"1715985441000-0"
> XADD commandes * id 1003 userId moussa total 120000 produit "ecran"
"1715985456000-0"

# Lire tous les evenements depuis le debut
> XRANGE commandes - +
1) 1) "1715985432000-0"
   2) 1) "id"
      2) "1001"
      ...

# Longueur du stream
> XLEN commandes
(integer) 3

L’astérisque * demande à Redis de générer l’ID automatiquement avec le format milliseconds-sequence. Cet ID est strictement croissant ce qui garantit l’ordre temporel des événements. La payload est un dictionnaire arbitraire — vous pouvez ajouter autant de champs que nécessaire (jusqu’à 512 Mo cumulés par événement).

Étape 2 — Lire les événements en mode pull (XREAD)

La commande XREAD lit un ou plusieurs streams à partir d’un ID donné. Pour lire les nouveaux événements en continu, on utilise l’option BLOCK qui attend si aucun nouvel événement n’est disponible.

// consumer-simple.js
import Redis from 'ioredis';
const redis = new Redis({ username:'appuser', password:process.env.REDIS_PASSWORD });

let dernierId = '$';  // '$' = dernier evenement existant au demarrage

while (true) {
    const result = await redis.xread('BLOCK', 5000, 'STREAMS', 'commandes', dernierId);
    if (!result) {
        console.log('Aucun evenement en 5s, on continue...');
        continue;
    }
    const [stream, events] = result[0];
    for (const [id, fields] of events) {
        // fields est un tableau ['id','1001','userId','aissa',...]
        const data = {};
        for (let i = 0; i < fields.length; i += 2) data[fields[i]] = fields[i+1];
        console.log(`Evenement ${id}:`, data);
        dernierId = id;
    }
}

Ce consommateur lit tous les nouveaux événements à partir du moment où il démarre. Limite : si le consommateur crashe et redémarre, il rate les événements arrivés pendant l’interruption (car $ = à partir de maintenant). Pour résoudre ce problème et permettre plusieurs consommateurs parallèles, on utilise les consumer groups.

Étape 3 — Créer un consumer group

Un consumer group permet à plusieurs workers de se partager les événements d’un stream tout en garantissant qu’aucun événement n’est traité deux fois. Redis maintient pour chaque consumer group la liste des événements pending (livrés mais pas encore acknowledged) et le pointeur de progression.

redis-cli --user appuser -a "$REDIS_PASSWORD"

# Creer un consumer group nomme 'traitement' demarrant a la fin du stream
> XGROUP CREATE commandes traitement $
OK

# Lister les consumer groups
> XINFO GROUPS commandes
1)  1) "name"
    2) "traitement"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1715985456000-0"

Le $ à la création signifie « commencer à lire à partir des prochains événements ajoutés ». Pour rejouer l’historique complet, utilisez 0 à la place. La présence du groupe ne consomme presque rien — c’est un simple pointeur de progression.

Étape 4 — Worker dans un consumer group

// worker-stream.js
import Redis from 'ioredis';
const redis = new Redis({ username:'appuser', password:process.env.REDIS_PASSWORD });

const STREAM = 'commandes';
const GROUP = 'traitement';
const CONSUMER = `worker-${process.pid}`;

// Boucle de consommation
while (true) {
    try {
        const result = await redis.xreadgroup(
            'GROUP', GROUP, CONSUMER,
            'COUNT', 10,
            'BLOCK', 5000,
            'STREAMS', STREAM, '>'
        );
        if (!result) continue;

        const [_streamName, events] = result[0];
        for (const [id, fields] of events) {
            try {
                const data = {};
                for (let i = 0; i < fields.length; i += 2) data[fields[i]] = fields[i+1];
                console.log(`[${CONSUMER}] Traite ${id}:`, data);

                // Traitement metier (envoi email, decrement stock, etc.)
                await traiter(data);

                // Acquittement
                await redis.xack(STREAM, GROUP, id);
            } catch (err) {
                console.error(`Erreur sur ${id}:`, err.message);
                // Pas d'ACK : l'evenement reste pending et sera reessaye
            }
        }
    } catch (err) {
        console.error('Erreur boucle:', err.message);
        await new Promise(r => setTimeout(r, 1000));
    }
}

async function traiter(donnees) {
    await new Promise(r => setTimeout(r, 200));
    if (Math.random() < 0.05) throw new Error('Erreur transitoire simulee');
}

Le caractère > dans XREADGROUP signifie « lire uniquement les événements jamais délivrés à ce consumer group ». COUNT 10 limite à 10 événements par lecture (utile pour batcher). BLOCK 5000 attend jusqu’à 5 secondes si aucun nouvel événement. Crucialement, l’XACK en fin de traitement marque l’événement comme acquitté ; sans cela, il reste dans la PEL (Pending Entries List) et sera réessayé.

Étape 5 — Lancer plusieurs workers

Lancez 3 instances du worker dans des terminaux séparés. Redis distribue automatiquement les événements entre eux — chaque événement est livré à un seul worker du groupe.

# Terminal 1
node worker-stream.js
# Terminal 2
node worker-stream.js
# Terminal 3
node worker-stream.js

Pendant que les workers tournent, injectez des événements dans un autre terminal :

for i in {1..30}; do
    redis-cli --user appuser -a "$REDIS_PASSWORD" \
        XADD commandes '*' id "$i" userId "user$i" total "$((RANDOM % 100000))"
    sleep 0.2
done

Vous observez les 30 événements répartis entre les 3 workers, environ 10 chacun. Si vous tuez un worker (Ctrl+C), les événements qui lui étaient en cours restent dans la PEL. Le tutoriel ci-dessous montre comment les réclamer.

Étape 6 — Récupérer les messages bloqués (XPENDING + XCLAIM)

Si un worker crashe alors qu’il avait des événements en cours, ces événements restent indéfiniment pending. Il faut un mécanisme pour les transférer à un autre worker au-delà d’un certain délai (idle time).

// recovery.js — a executer periodiquement
async function recovery() {
    // Liste les evenements pending depuis plus de 30 secondes
    const pending = await redis.xpending(STREAM, GROUP, 'IDLE', 30000, '-', '+', 100);

    for (const [id, consumer, msIdle, deliveryCount] of pending) {
        if (deliveryCount > 5) {
            // Trop de tentatives : envoyer en dead letter
            await redis.xadd('dead-letter', '*', 'originalId', id, 'consumer', consumer, 'deliveryCount', deliveryCount);
            await redis.xack(STREAM, GROUP, id);
            console.warn(`DLQ : ${id}`);
            continue;
        }

        // Reclaim : transferer a CE worker
        const claimed = await redis.xclaim(STREAM, GROUP, CONSUMER, 30000, id);
        if (claimed.length > 0) {
            console.log(`Reclame ${id} (idle ${msIdle}ms, ${deliveryCount} tentatives)`);
            // Le retraiter dans la boucle principale
        }
    }
}

setInterval(recovery, 10000);  // toutes les 10 secondes

La commande XPENDING liste les événements pending ; XCLAIM transfère la propriété à un autre consumer. Cette logique de recovery doit s’exécuter en arrière-plan dans chaque worker. La règle métier « si deliveryCount > 5, envoyer en dead letter » empêche les boucles infinies sur un événement empoisonné.

Étape 7 — Limiter la taille du stream

Sans contrôle, un stream croît indéfiniment et consomme de plus en plus de mémoire. Deux stratégies de rétention sont disponibles : taille maximale (MAXLEN) ou âge maximum (MINID).

// Limiter a environ 1 million d'evenements
await redis.xadd('commandes', 'MAXLEN', '~', 1000000, '*', 'id', '1001', 'userId', 'aissa');

// Ou supprimer tout ce qui est avant un timestamp donne
const il_y_a_24h = Date.now() - 24*3600*1000;
await redis.xtrim('commandes', 'MINID', '~', il_y_a_24h);

L’opérateur ~ dit « approximativement » : Redis peut conserver légèrement plus que la limite pour optimiser les performances (le nettoyage se fait par blocs entiers). C’est généralement acceptable. Pour une limite stricte, utilisez = à la place, au prix d’une performance légèrement réduite.

Étape 8 — Comparer avec Apache Kafka

Pour qui connaît Kafka, voici la correspondance des concepts :

Kafka Redis Streams
Topic Stream (clé Redis)
Partition Stream unique (pas de partitioning natif intra-stream)
Offset ID Redis Stream (timestamp-séquence)
Consumer group Consumer group (concept identique)
Commit offset XACK
Log compaction Non disponible (utiliser XTRIM par âge)
Réplication multi-broker Réplica Redis classique
Schema Registry (Avro) Non — on utilise JSON brut

Pour des volumes en dessous du téraoctet par jour et des charges modérées, Redis Streams couvre 90 % des besoins événementiels avec une simplicité opérationnelle drastiquement supérieure. Au-delà — multi-DC, rétention sur plusieurs mois, schemas typés stricts — Kafka reste le bon outil.

Étape 9 — Cas d’usage : feed d’activité utilisateur

Un cas typique : enregistrer toutes les actions utilisateur (login, achat, like, commentaire) dans un stream pour alimenter un feed d’activité, un système de recommandation et de l’analytics temps réel.

// Producer cote API
app.post('/api/like', async (req, res) => {
    await redis.xadd('activity', '*',
        'type', 'like',
        'userId', req.user.id,
        'targetId', req.body.postId,
        'ts', Date.now()
    );
    res.json({ ok: true });
});

// Consumer A : feed d'activite en temps reel
group: 'feed-builder' --> met a jour la timeline de chaque follower

// Consumer B : analytics
group: 'analytics' --> aggrege par heure et stocke en PostgreSQL

// Consumer C : ML
group: 'ml-recommandations' --> alimente un modele de recommandations

Chaque consumer group consomme indépendamment, à son propre rythme. Si le consumer ML est plus lent que le consumer feed, il prend simplement plus de retard sans bloquer les autres. C’est l’archétype d’architecture event-driven.

Erreurs fréquentes

Erreur Cause Solution
Événements jamais consommés Consumer group créé après XADD avec $ comme position de départ Recréer le groupe avec XGROUP CREATE stream group 0 pour relire depuis le début
PEL grandit indéfiniment Workers ne font pas XACK Vérifier que la branche succès du worker appelle bien XACK
Latence élevée en consommation BLOCK 0 (infini) provoque des reconnexions Redis lentes Utiliser BLOCK 5000 et boucler
Memory leak Redis Stream illimité, MAXLEN jamais appliqué Configurer un job cron qui exécute XTRIM périodiquement
Événements traités en double Worker crashe entre traiter() et XACK Rendre traiter idempotent (vérifier qu’un événement n’a pas déjà été appliqué)

Tutoriels suivants

FAQ

Streams ou BullMQ pour traiter des jobs ?
BullMQ pour les jobs avec retry, priorité, scheduling cron — il offre une abstraction de plus haut niveau et un écosystème (Bull Board). Streams pour les événements métier brut où plusieurs consommateurs indépendants doivent traiter le même flux (event sourcing, fan-out vers analytics + email + ML).
Comment partitioner pour scaler ?
Redis Streams ne supporte pas le partitioning intra-stream. Pour scaler : (1) sharder par clé applicative en créant N streams (commandes-shard-0, commandes-shard-1, …), (2) router chaque événement vers le bon shard selon hash(userId), (3) avoir N consumer groups indépendants. C’est manuel mais simple.
Quel format pour la payload ?
Champs clé-valeur plats si la structure est simple et stable. JSON encodé dans un seul champ payload si nécessaire de typages riches. Pour la production où l’évolution du schéma est critique, considérez JSON Schema ou Protobuf avec un Schema Registry maison.
Combien de streams maximum ?
Aucune limite stricte côté Redis — c’est juste des clés. Avec quelques milliers de streams sur un même Redis, les performances restent excellentes. Au-delà, envisager le partitioning par instance Redis dédiée.

Références

Service ITSkillsCenter

Site ou application web sur mesure

Conception Pro + Nom de domaine 1 an + Hébergement 1 an + Formation + Support 6 mois. Accès et code livrés. À partir de 350 000 FCFA.

Demander un devis
Publicité