Développement Web

Kafka Streams 4.2 en Java : agrégations, fenêtres et jointures

16 دقائق للقراءة

Guide principal : Apache Kafka en production en 2026 : panorama complet

Tutoriels reliés : Producers et consumers idempotents · Pipeline CDC PostgreSQL avec Debezium

Kafka Streams est la bibliothèque de traitement de flux livrée nativement avec Apache Kafka. Elle ne nécessite aucun cluster séparé : votre application Java embarque Kafka Streams comme une simple dépendance Maven, et le traitement parallèle est automatiquement distribué via les groupes de consommateurs Kafka. Cette absence de runtime tiers est un atout opérationnel majeur — pas de cluster Flink ou Spark à maintenir, pas de scheduler à monitorer, pas de courbe d’apprentissage supplémentaire. En 2026, avec Kafka 4.2, la bibliothèque a franchi un cap important : le protocole de rebalance côté serveur (KIP-1071) est passé en disponibilité générale, ce qui rend les redémarrages de topologies stateful beaucoup plus rapides et plus prévisibles.

Ce tutoriel construit pas à pas une application Kafka Streams qui agrège les paiements mobile-money par marchand et par tranche d’une minute, expose un endpoint REST pour interroger l’état en temps réel, et gère proprement les redémarrages sans perdre l’état accumulé.

Prérequis

  • Un cluster Kafka 4.2 fonctionnel
  • JDK 21 et Maven 3.9
  • Une compréhension de base des topics, partitions et groupes de consommateurs Kafka
  • Familiarité avec le concept de stream et de table en analytique
  • Temps estimé : 90 minutes

Étape 1 — Comprendre stream vs table

Le modèle conceptuel de Kafka Streams repose sur une dualité simple à comprendre mais profonde dans ses implications. Un KStream est un flux infini d’événements : chaque message qui arrive est un nouvel enregistrement, indépendant des précédents. Un KTable, en revanche, est une représentation matérialisée d’un état : à chaque message reçu, la valeur courante associée à la clé est remplacée. Le même topic Kafka peut être lu comme un KStream (flux d’événements) ou comme un KTable (état courant) selon le besoin.

Pour un cas concret : le topic paiements-mobile-money lu comme KStream donne accès à tous les paiements de l’histoire. Lu comme KTable indexé par numéro client, il donne en chaque instant le dernier paiement effectué par chaque client — une materialized view avec mise à jour automatique. Les opérations d’agrégation (count, sum, aggregate) transforment naturellement un KStream en KTable.

Étape 2 — Préparer le projet Maven

La dépendance kafka-streams est publiée sur Maven Central et alignée sur la version du broker. On ajoute aussi kafka-clients (transitif mais utile à expliciter) et Jackson pour la sérialisation JSON.

<dependencies>
  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>4.2.0</version>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.21.3</version>
  </dependency>
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>2.0.16</version>
  </dependency>
</dependencies>

SLF4J Simple suffit pour le développement ; en production, on préfère Logback ou Log4j 2 avec une configuration explicite des niveaux. La version 4.2.0 est obligatoirement alignée avec le broker pour éviter des surprises sur les KIPs récents — notamment le rebalance protocol côté serveur introduit en 4.2.

Étape 3 — Première topologie : compter les paiements par marchand

La topologie la plus simple consiste à lire le topic des paiements, à grouper par marchand, et à compter. Le résultat est publié sur un nouveau topic et matérialisé dans un state store local que l’application peut interroger.

package io.itskillscenter.kafka.streams;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.Stores;

import java.time.Duration;
import java.util.Properties;

public class PaymentAggregator {
    private static final ObjectMapper MAPPER = new ObjectMapper();

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "paiement-aggregator-v1");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
        props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/lib/kafka-streams");
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);

        StreamsBuilder builder = new StreamsBuilder();

        KStream paiements = builder.stream("paiements-mobile-money");

        KTable comptesParMarchand = paiements
            .filter((k, v) -> v != null)
            .mapValues(v -> {
                try { return MAPPER.readTree(v); }
                catch (Exception e) { return null; }
            })
            .filter((k, j) -> j != null && j.has("marchand_id"))
            .groupBy((k, j) -> j.get("marchand_id").asText(), Grouped.with(Serdes.String(), null))
            .count(Materialized.as("comptes-marchand-store"));

        comptesParMarchand.toStream()
            .to("paiements-comptes-marchand", Produced.with(Serdes.String(), Serdes.Long()));

        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
        streams.start();
    }
}

Trois choses méritent l’attention. processing.guarantee=exactly_once_v2 active les transactions Kafka pour garantir que chaque paiement est compté exactement une fois, même en cas de redémarrage. state.dir doit pointer sur un disque persistant — c’est là que Kafka Streams stocke localement les state stores via RocksDB. num.stream.threads=2 détermine le parallélisme local ; pour un topic à six partitions, distribuer le travail sur deux ou trois threads par instance est typique.

Au démarrage, l’application crée automatiquement un topic de changelog nommé paiement-aggregator-v1-comptes-marchand-store-changelog qui répliquera l’état du store sur Kafka — c’est ce mécanisme qui permet de redémarrer une instance sans perdre l’état accumulé. La restauration depuis le changelog est habituellement deux à dix fois plus rapide que de rejouer le topic source.

Étape 4 — Ajouter une fenêtre temporelle

Compter sur tout l’historique répond à « combien de paiements depuis le début » ; pour répondre à « combien de paiements par marchand cette dernière minute », il faut une windowed aggregation. Kafka Streams supporte trois types de fenêtres : tumbling (non chevauchantes), hopping (chevauchantes), session (regroupent par inactivité).

KTable, Long> comptesParMinute = paiements
    .filter((k, v) -> v != null)
    .mapValues(PaymentAggregator::parseJsonSafe)
    .filter((k, j) -> j != null)
    .groupBy((k, j) -> j.get("marchand_id").asText(), Grouped.with(Serdes.String(), null))
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .count(Materialized.as("comptes-marchand-1min"));

comptesParMinute.toStream()
    .map((wk, v) -> new KeyValue<>(wk.key() + "@" + wk.window().startTime(), v))
    .to("paiements-comptes-marchand-1min", Produced.with(Serdes.String(), Serdes.Long()));

La méthode ofSizeWithNoGrace(Duration.ofMinutes(1)) indique des fenêtres de 60 secondes sans période de tolérance pour les événements en retard. En pratique, on accorde souvent une grace period de quelques secondes pour absorber les arrivées légèrement déphasées : ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofSeconds(5)) est un bon réglage par défaut. La clé du résultat est un Windowed<String> qui combine la clé applicative et le début de la fenêtre — c’est pourquoi on aplatit en une chaîne marchand@horodatage avant publication.

Étape 5 — Jointures stream-table

Le vrai pouvoir de Kafka Streams émerge avec les jointures. Imaginons qu’on dispose d’un topic marchands où chaque message décrit un marchand (nom commercial, ville, plafond mensuel). On peut joindre dynamiquement chaque paiement avec les informations du marchand, sans appel synchrone à une base externe.

KTable marchands = builder.table("marchands");

KStream paiementsEnrichis = paiements
    .selectKey((k, v) -> {
        try { return MAPPER.readTree(v).get("marchand_id").asText(); }
        catch (Exception e) { return null; }
    })
    .filter((k, v) -> k != null)
    .leftJoin(marchands, (paiement, marchand) -> {
        try {
            var node = MAPPER.readTree(paiement);
            if (marchand != null) {
                ((com.fasterxml.jackson.databind.node.ObjectNode) node)
                    .set("marchand_info", MAPPER.readTree(marchand));
            }
            return MAPPER.writeValueAsString(node);
        } catch (Exception e) { return paiement; }
    });

paiementsEnrichis.to("paiements-enrichis");

Le leftJoin garantit que chaque paiement est publié, même si le marchand n’existe pas encore dans la table. C’est important pour ne pas perdre d’événement lors d’un démarrage où la table n’est pas encore complètement matérialisée. Côté performance, la jointure est entièrement locale : aucune requête vers Kafka pendant l’opération, l’état du marchand est dans la mémoire RocksDB de l’instance.

Étape 6 — Interroger l’état avec Interactive Queries

Kafka Streams expose une API pour interroger les state stores depuis l’application elle-même. C’est ce qu’on appelle les Interactive Queries : on peut servir un endpoint REST qui répond en moins d’une milliseconde aux requêtes sur le compte courant de chaque marchand, sans aucun appel à Kafka.

// Démarrage d'un mini-HTTP via com.sun.net.httpserver
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
server.createContext("/marchand", exchange -> {
    String marchand = exchange.getRequestURI().getQuery().split("=")[1];
    var store = streams.store(StoreQueryParameters.fromNameAndType(
        "comptes-marchand-store", QueryableStoreTypes.keyValueStore()));
    Object val = store.get(marchand);
    String body = val == null ? "{\"count\":0}" : "{\"count\":" + val + "}";
    exchange.sendResponseHeaders(200, body.getBytes().length);
    exchange.getResponseBody().write(body.getBytes());
    exchange.close();
});
server.start();

L’appel store.get("BOUTIQUE-ABIDJAN-42") consulte directement RocksDB localement — typiquement 0,1 à 0,5 ms de latence. Cette architecture supprime totalement le besoin d’une base read-only en aval pour servir des dashboards ou des API de consultation. Pour une PME ouest-africaine, c’est une économie d’infrastructure significative : un seul JVM Kafka Streams sur 4 vCPU peut servir aussi bien le traitement temps réel que les requêtes lectures à plusieurs milliers de QPS.

Étape 7 — Gestion d’erreurs et dead letter queue

En production, certains messages sont mal formés — un JSON invalide, une clé null, un schéma rejeté. Sans handler explicite, ces erreurs interrompaient brutalement la topologie. Deux KIPs récents règlent le problème : KIP-1033 (introduit en Kafka 3.9) ajoute un processing exception handler pour gérer les erreurs survenant pendant le traitement avec deux implémentations prêtes (LogAndFail et LogAndContinue). KIP-1034 ajoute la dead letter queue native via la propriété errors.dead.letter.queue.topic.name, supportée par les trois types de handlers (deserialization, processing, production).

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
    "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler");
props.put("errors.dead.letter.queue.topic.name", "paiements-dlq");

Les messages que la topologie n’arrive pas à désérialiser sont publiés sur le topic paiements-dlq avec leurs headers d’origine — pour analyse ultérieure — et le traitement continue normalement grâce au LogAndContinueExceptionHandler. Cette protection évite qu’un seul message invalide bloque le pipeline entier, un classique qui a fait perdre beaucoup d’heures aux équipes opérationnelles. La même logique s’applique pour les erreurs côté production via DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG et côté processing depuis Kafka 3.9.

Erreurs fréquentes

Erreur Cause Solution
State directory locked by another process Plusieurs instances avec même APPLICATION_ID_CONFIG sur même machine Configurer un state.dir distinct par instance
Lag persistent malgré restart Topic source repartitionné après changement de clé Augmenter num.stream.threads et provisionner plus d’instances
SerializationException bloquante Handler d’erreur en mode FAIL Configurer la DLQ avec response=CONTINUE
Jointure stream-table vide Table pas encore matérialisée au démarrage Utiliser leftJoin ou attendre la fin du bootstrap
RocksDB grandit indéfiniment Pas de politique de rétention sur les state stores Activer cleanup.policy=delete sur le changelog avec retention.ms

Scaling horizontal et tolérance à la panne

Kafka Streams scale par ajout d’instances avec le même application.id. Au démarrage de la seconde instance, le groupe de consommateurs sous-jacent rebalance automatiquement les partitions du topic source et des changelogs entre les deux instances. Chaque instance prend la moitié du travail, et chaque instance dispose localement de la moitié des state stores. Les Interactive Queries deviennent alors distribuées : si l’instance A reçoit une requête pour un marchand dont l’état est sur l’instance B, elle doit déléguer la requête en interne. Kafka Streams expose une API pour découvrir quelle instance détient quelle clé via StreamsMetadata.

Pour la tolérance à la panne, Kafka Streams reconstitue les state stores depuis les topics de changelog en cas de perte d’une instance. Le redémarrage est cependant proportionnel à la taille de l’état : un store de 10 Go met typiquement deux à dix minutes à se reconstituer. Le KIP-441, déjà GA depuis Kafka 2.6, atténue cela via le concept de warm standby : on peut configurer une réplique de chaque state store sur une autre instance, prête à reprendre immédiatement. Cela double l’espace disque consommé, mais ramène le temps de bascule à quelques secondes.

Coût infrastructure

Une application Kafka Streams typique pour une PME ouest-africaine — disons cinq cents marchands suivis, deux fenêtres temporelles, une jointure avec un référentiel — tient confortablement sur une instance unique de 4 vCPU et 8 Go de RAM. Sur Hetzner CPX31 à environ 21 euros par mois (grille tarifaire d’avril 2026), on dispose de 4 vCPU, 8 Go de RAM et 240 Go de NVMe — largement de quoi héberger plusieurs gigaoctets de state stores RocksDB. Pour la résilience, deux instances sur deux zones (Helsinki et Falkenstein) coûtent autour de 42 euros par mois et tolèrent la perte complète d’un datacenter.

Ressources et références

Comparer Kafka Streams aux alternatives

Avant de s’engager sur Kafka Streams, il vaut la peine de positionner la bibliothèque dans le paysage du stream processing. Apache Flink est la référence côté performance pure et richesse fonctionnelle (CEP, gestion native du temps évenementiel, fenêtres très flexibles) ; il demande en revanche un cluster dédié et une équipe qui sait l’opérer. Apache Spark Structured Streaming est plus orienté micro-batches et excelle quand on combine du SQL analytique sur des fenêtres de l’ordre de la minute. Faust, en Python, séduit les équipes data science mais ne couvre qu’une partie des fonctionnalités. Materialize et RisingWave proposent une approche SQL streaming sur lakehouse — pertinente pour de l’analytique en temps réel sans Java.

Kafka Streams brille quand l’équipe est déjà sur la JVM, quand on veut éviter un cluster supplémentaire et quand le besoin tient dans la sémantique exactly-once garantie par Kafka. Pour des charges de quelques milliers d’événements par seconde et des topologies modérément complexes — l’usage le plus courant en PME — c’est le choix de moindre friction.

Tests unitaires d’une topologie

Un avantage opérationnel de Kafka Streams trop rarement exploité : la TopologyTestDriver. Cette classe permet de tester une topologie complète en mémoire, sans broker, en injectant des événements et en inspectant les sorties. Elle réside dans l’artefact Maven kafka-streams-test-utils qu’il faut ajouter en dépendance de test. Pour rendre la topologie testable, on extrait sa construction dans une méthode statique buildTopology(StreamsBuilder) appelée depuis main et depuis les tests.

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>4.2.0</version>
    <scope>test</scope>
</dependency>

Le test ci-dessous valide que notre comptage par marchand fonctionne correctement.

import org.apache.kafka.streams.*;
import org.apache.kafka.streams.test.TestRecord;

@Test
void compteur_increment_par_marchand() {
    StreamsBuilder builder = new StreamsBuilder();
    PaymentAggregator.buildTopology(builder);
    Topology topology = builder.build();
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

    try (TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
        TestInputTopic input = driver.createInputTopic(
            "paiements-mobile-money", new StringSerializer(), new StringSerializer());
        input.pipeInput("+221", "{\"marchand_id\":\"M1\",\"montant\":1000}");
        input.pipeInput("+221", "{\"marchand_id\":\"M1\",\"montant\":2000}");

        KeyValueStore store = driver.getKeyValueStore("comptes-marchand-store");
        assertEquals(2L, store.get("M1"));
    }
}

Un test de ce type tourne en moins de 100 millisecondes et permet d’itérer rapidement sur la logique métier. C’est la pratique recommandée pour toute topologie sérieuse — la TDD sur Kafka Streams change radicalement la confiance qu’on peut avoir avant un déploiement.

Le rebalance protocol côté serveur en pratique

L’arrivée du protocole de rebalance côté serveur en GA dans Kafka 4.2 (KIP-1071) est plus qu’un détail technique. Dans les versions précédentes, chaque rebalance entrait dans une phase de coordination entre tous les consumers du groupe : on suspendait le traitement, on calculait l’assignation, on synchronisait, on reprenait. Avec un grand nombre d’instances et de partitions, cette opération pouvait durer plusieurs dizaines de secondes pendant lesquelles l’application était figée. Le nouveau protocole déplace la décision côté broker — le contrôleur calcule l’assignation, la pousse aux consumers, et ces derniers continuent leur travail pendant la transition. KIP-1071 supprime le point de synchronisation global ; en pratique, plusieurs équipes rapportent des temps de rebalance divisés par dix à vingt sur des topologies de taille moyenne (KIP-848, le pendant côté consumer, a documenté un cas type 103 secondes → 5 secondes sur un groupe de 10 consumers et 900 partitions).

Pour activer ce protocole, on positionne group.protocol=consumer côté consumer ou directement dans les propriétés Streams. Le passage se fait sans changement de code applicatif — seule la configuration évolue. C’est ce type d’amélioration silencieuse qui justifie de monter rapidement vers Kafka 4.2 plutôt que de rester sur 3.9 LTS au-delà de la fenêtre de migration ZooKeeper.

Observabilité Streams en production

Au-delà des métriques broker classiques, Kafka Streams expose ses propres compteurs JMX qu’il est crucial de monitorer. kafka.streams:type=stream-metrics,client-id=* donne le débit global. kafka.streams:type=stream-thread-metrics,thread-id=* détaille la santé de chaque thread — un thread bloqué pendant plus de 30 secondes est un signal d’incident. kafka.streams:type=stream-task-metrics,task-id=* remonte le lag par tâche, équivalent au lag par partition pour la topologie. Avec un exporter Prometheus configuré sur l’application et un dashboard Grafana adapté, on a une vision complète de ce que fait l’application en moins de cinq minutes de setup.

Patterns d’évolution d’une topologie en production

Modifier une topologie Kafka Streams en production demande quelques précautions. Toute modification structurelle — ajout d’une jointure, changement du type d’agrégation, modification du keyselector — invalide les state stores existants et impose une reconstruction depuis le changelog ou depuis le topic source. La règle prudente : on incrémente le suffixe de version de l’application.id à chaque changement non rétrocompatible (par exemple paiement-aggregator-v2), on déploie en parallèle, on bascule le trafic, puis on retire l’ancienne version après vérification. Cette discipline évite les états corrompus et les retours en arrière douloureux.


Retour au guide principal : Apache Kafka en production en 2026 : panorama complet.

Service ITSkillsCenter

Site ou application web sur mesure

Conception Pro + Nom de domaine 1 an + Hébergement 1 an + Formation + Support 6 mois. Accès et code livrés. À partir de 350 000 FCFA.

Demander un devis
Publicité