Développement Web

Kafka 4.2 : producers et consumers idempotents en Java

15 min de lecture

Guide principal : Apache Kafka en production en 2026 : panorama complet

Tutoriels reliés : Démarrer Kafka 4.2 en mode KRaft · Kafka Streams 4.2 — agrégations et fenêtres

L’erreur la plus pénible en event streaming n’est pas la perte d’un message — Kafka se protège bien contre cela. C’est le doublon silencieux : un paiement comptabilisé deux fois, un email envoyé en double, une commande répliquée. Depuis Kafka 3.0 (septembre 2021), le producer idempotent est activé par défaut via le KIP-679, et la version 4.2 a renforcé encore la chaîne avec une meilleure gestion des erreurs côté broker. Mais l’idempotence côté producteur ne règle qu’une partie du problème : sans une chaîne complète exactly-once ou un consumer écrit avec discipline, le doublon peut réapparaître au moment du commit d’offset.

Ce tutoriel construit pas à pas un producer Java idempotent puis un consumer qui implémente correctement le motif read-process-commit, et termine par la mise en place des transactions Kafka pour le cas où l’on a vraiment besoin de la garantie exactly-once de bout en bout. Tout le code est testé contre Apache Kafka 4.2.0 et le client kafka-clients 4.2.0 publié sur Maven Central.

Prérequis

  • Un cluster Kafka 4.2 fonctionnel — voir le tutoriel précédent pour le montage en mode KRaft
  • JDK 21 ou 25 installé localement
  • Maven 3.9 ou Gradle 8.10
  • Bases solides en Java — collections, threads, try-with-resources
  • Temps estimé : 75 minutes

Étape 1 — Comprendre l’idempotence côté producteur

Quand un producer Kafka publie un message, il attend un accusé de réception du broker. Si l’accusé n’arrive pas en temps voulu — coupure réseau, GC pause, broker leader qui élit en plein milieu — le producer retransmet automatiquement. Sans protection, ce comportement crée des doublons : le broker a peut-être bel et bien écrit le message la première fois, mais l’ACK a été perdu en route.

Le producer idempotent règle ce problème en attribuant à chaque producteur un Producer ID (PID) unique et en numérotant chaque message avec un sequence number monotone par partition. Le broker tient un état pour chaque (PID, partition) et refuse silencieusement tout doublon de séquence. Aucune intervention applicative requise : la déduplication est entièrement transparente. Cette mécanique est décrite dans le KIP-98 et fait partie du moteur Kafka depuis la version 0.11 ; ce qui change en 2026, c’est que la configuration enable.idempotence=true est activée par défaut depuis 3.0 et qu’aucune raison sérieuse ne justifie de la désactiver.

Trois propriétés sont automatiquement contraintes quand l’idempotence est active : acks=all (les répliques doivent toutes confirmer), max.in.flight.requests.per.connection ≤ 5, et retries > 0. Toute tentative de configurer un de ces paramètres en conflit avec l’idempotence remontera une ConfigException au démarrage.

Étape 2 — Écrire un producer Java idempotent

Le code ci-dessous est un producer prêt pour la production : il sérialise des paiements en JSON, configure explicitement les garanties d’idempotence, gère les erreurs synchrones et asynchrones, et ferme proprement la ressource. Il s’appuie uniquement sur la bibliothèque kafka-clients de la distribution officielle Apache.

<!-- pom.xml -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>4.2.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.21.3</version>
</dependency>
package io.itskillscenter.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

public class PaymentProducer {
    private static final ObjectMapper MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092,kafka2.example.com:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Idempotence — activée par défaut depuis 3.0, mais on l'explicite pour la doc
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

        // Optimisations standard
        props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");

        try (KafkaProducer producer = new KafkaProducer<>(props)) {
            for (int i = 0; i < 100; i++) {
                Map payment = Map.of(
                    "id", "PAY-" + i,
                    "client", "+22177123" + (4000 + i),
                    "montant", 5000 + i * 250,
                    "moyen", i % 2 == 0 ? "Wave" : "Orange Money"
                );
                String key = (String) payment.get("client");
                String value = MAPPER.writeValueAsString(payment);
                ProducerRecord record =
                    new ProducerRecord<>("paiements-mobile-money", key, value);

                producer.send(record, (meta, ex) -> {
                    if (ex != null) {
                        System.err.println("Échec envoi: " + ex.getMessage());
                    } else {
                        System.out.println("Envoyé partition=" + meta.partition() +
                                           " offset=" + meta.offset());
                    }
                });
            }
            producer.flush();
        }
    }
}

Le try-with-resources est essentiel : il garantit que close() est appelé même en cas d’exception, ce qui force le flush des batches en attente et la libération du PID auprès du broker. Sans cette précaution, jusqu’à 32 Ko de messages bufferisés peuvent être perdus. Le linger.ms=20 et la compression zstd ajoutent un gain de débit substantiel — typiquement 30 à 50 % de bande passante économisée sur des charges JSON — sans dégrader sensiblement la latence perçue.

On exécute le code avec mvn package && java -cp target/kafka-app-1.0.jar io.itskillscenter.kafka.PaymentProducer. La sortie liste les cent envois ; pour les retrouver côté topic, on utilise kafka-console-consumer.sh --from-beginning --topic paiements-mobile-money --bootstrap-server kafka1.example.com:9092.

Étape 3 — Comprendre l’idempotence côté consumer

Le consumer Kafka, par défaut, n’est pas idempotent. Il s’abonne à un topic, reçoit des batches de messages, les traite, puis valide ses offsets pour signaler la progression. Le problème classique est la séquence suivante : le consumer traite cent messages, en commit cinquante, puis crashe avant de commiter les cinquante autres. Au redémarrage, il reprendra à partir de l’offset cinquante et retraitera les cinquante messages restants — qui ont déjà été traités. Le doublon est applicatif, et il est invisible à Kafka.

Trois patterns adressent cette difficulté. Le premier, le plus fréquent, consiste à rendre le traitement applicatif idempotent : on s’assure qu’écrire deux fois le même paiement en base ne provoque qu’une seule ligne, via une contrainte d’unicité ou un upsert. C’est ce qu’on appellera l’idempotence métier. Le second pattern, plus sophistiqué, utilise les transactions Kafka pour un exactly-once de bout en bout. Le troisième s’appuie sur un offset store externe transactionnel — typiquement la même base que celle qui reçoit les écritures applicatives — qu’on commit dans la même transaction.

Étape 4 — Implémenter un consumer avec idempotence métier

L’approche la plus pragmatique pour la grande majorité des applications est de rendre l’opération de traitement idempotente. Le code suivant consomme les paiements, les écrit en PostgreSQL via une instruction INSERT ... ON CONFLICT DO NOTHING, puis commit les offsets manuellement.

package io.itskillscenter.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class IdempotentPaymentConsumer {
    private static final ObjectMapper MAPPER = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "paiements-comptable-v1");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);

        DataSource ds = buildDataSource(); // HikariCP recommandé

        try (KafkaConsumer consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList("paiements-mobile-money"));
            while (!Thread.currentThread().isInterrupted()) {
                ConsumerRecords records = consumer.poll(Duration.ofSeconds(2));
                if (records.isEmpty()) continue;

                try (Connection conn = ds.getConnection()) {
                    conn.setAutoCommit(false);
                    try (PreparedStatement ps = conn.prepareStatement(
                            "INSERT INTO paiements (id, client, montant, moyen) " +
                            "VALUES (?, ?, ?, ?) ON CONFLICT (id) DO NOTHING")) {
                        for (ConsumerRecord rec : records) {
                            JsonNode p = MAPPER.readTree(rec.value());
                            ps.setString(1, p.get("id").asText());
                            ps.setString(2, p.get("client").asText());
                            ps.setInt(3, p.get("montant").asInt());
                            ps.setString(4, p.get("moyen").asText());
                            ps.addBatch();
                        }
                        ps.executeBatch();
                    }
                    conn.commit();
                }
                consumer.commitSync();
            }
        }
    }
    // buildDataSource() omis pour la concision
}

Deux points clés. D’abord, enable.auto.commit=false et commitSync() manuel : on contrôle exactement quand les offsets sont publiés, donc le redémarrage est prévisible. Ensuite, ON CONFLICT (id) DO NOTHING : si le même paiement est réinjecté, il est silencieusement ignoré côté base. La combinaison écriture-puis-commit garantit qu’on ne perd jamais de message, et l’idempotence métier garantit qu’on n’en duplique jamais. isolation.level=read_committed est important si d’autres producteurs utilisent des transactions : sans cette ligne, le consumer pourrait lire des messages encore non commités.

Étape 5 — Transactions Kafka pour un exactly-once vrai

Quand l’idempotence métier est impossible — par exemple un workflow qui lit un topic, calcule un agrégat et publie sur un autre topic — Kafka offre des transactions. Le producer ouvre une transaction, écrit sur N topics, marque les offsets sources comme consommés, puis commit l’ensemble en atomique. Tout ou rien.

Properties txProps = new Properties();
txProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
txProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-aggregateur-v1");
txProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
txProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer producer = new KafkaProducer<>(txProps);
producer.initTransactions();

try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("paiements-aggregates", "Wave", "{...}"));
    producer.send(new ProducerRecord<>("paiements-aggregates", "OM", "{...}"));
    producer.sendOffsetsToTransaction(currentOffsets, consumer.groupMetadata());
    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
    throw e;
}

transactional.id doit être stable et unique par instance applicative. Si vous lancez deux pods Kubernetes avec le même transactional.id, le second invalide automatiquement les transactions du premier — c’est le mécanisme de fencing qui empêche les zombies. sendOffsetsToTransaction remplace le commitSync du consumer : les offsets sont publiés à l’intérieur de la transaction, garantissant que la consommation et la production sont atomiquement liées.

Étape 6 — Mesurer et observer

Sans observation, on ne sait pas si la chaîne fonctionne. Trois métriques sont à monitorer en priorité avec un exporter JMX et Prometheus. kafka_consumer_records_consumed_total par topic et par groupe donne le débit réel. kafka_consumer_lag_sum par partition révèle les consommateurs en retard. kafka_producer_record_error_total remonte les erreurs d’envoi non retransmissibles.

Un signal de réussite clair : avec un producer idempotent et un consumer écrit comme à l’étape 4, vous devez pouvoir tuer brutalement le consumer en plein traitement, le relancer, et constater qu’aucune ligne en double n’est apparue côté base. Un test simple : kill -9 sur le consumer en plein poll, redémarrage, puis SELECT COUNT(*), COUNT(DISTINCT id) FROM paiements — les deux compteurs doivent être strictement égaux.

Erreurs fréquentes

Erreur Cause Solution
OutOfOrderSequenceException Producer redémarré avec un transactional.id dupliqué Veiller à un identifiant unique par instance
Doublons en base malgré idempotence Contrainte d’unicité manquante côté table Ajouter PRIMARY KEY (id) ou un index unique
InvalidTxnStateException commitTransaction() appelé hors d’une transaction ouverte Encadrer toute écriture par beginTransaction()
Consumer relit en boucle commit manuel oublié Activer enable.auto.commit=true ou appeler explicitement commitSync()
Lag qui ne baisse jamais max.poll.records trop faible ou traitement trop lent Profiler le traitement et augmenter max.poll.records

Quand utiliser quoi — décision rapide

Le choix entre idempotence métier et transactions Kafka n’est pas idéologique, c’est une affaire d’architecture. Si votre application écrit dans une base relationnelle ou un système qui supporte naturellement les upserts, utilisez l’idempotence métier : c’est simple, performant et lisible. Le coût opérationnel est faible et le débit reste maximal.

Si en revanche votre application est purement Kafka-to-Kafka — par exemple un agrégateur qui lit un flux d’événements, calcule, republie — les transactions sont incontournables. Elles introduisent une latence supplémentaire d’environ 10 à 30 millisecondes par batch en raison du protocole en deux phases, et un coût CPU sur le broker d’environ 5 à 10 % par rapport à un mode at-least-once. Pour la plupart des charges, c’est acceptable ; pour des charges très haute fréquence (au-delà de 200 000 messages/seconde par partition), on évalue au cas par cas.

Coûts et contexte ouest-africain

Une charge typique de plateforme mobile money en Côte d’Ivoire ou au Sénégal — disons un million de transactions par jour, soit environ douze messages par seconde en moyenne, avec des pics à cinquante — tient largement dans un cluster à trois brokers Hetzner CX32. La JVM Kafka tourne avec deux à quatre Go de heap, ce qui laisse de la marge pour le buffer cache du système. La consommation réseau effective avec compression zstd descend autour de 200 Ko/s en pic, ce qui ne pose aucune contrainte sur les liens 1 Gbps standard des VPS européens. Pour ce volume, le surcoût des transactions est invisible en pratique.

Ressources et références

Idempotent vs at-least-once vs exactly-once : repère mental

Le vocabulaire Kafka mélange trois notions qu’il vaut la peine de séparer mentalement. At-least-once est la garantie de base : aucun message confirmé ne sera perdu, mais des doublons peuvent apparaître. C’est le mode par défaut sans idempotence ni transactions, et c’est suffisant pour beaucoup de cas d’usage où le traitement est intrinsèquement idempotent (calcul d’agrégat à partir d’un état complet, indexation Meilisearch, etc.).

Idempotent producer ajoute la garantie qu’un même producer ne créera pas de doublon côté broker, même en cas de retransmission après timeout réseau. C’est la mécanique PID + sequence number décrite plus haut. La garantie est par session de producer : si le producer redémarre, un nouveau PID est attribué et les anciens éventuellement renvoyés peuvent recréer des doublons. C’est précisément cette limite que les transactions règlent.

Exactly-once semantics (EOS) combine producer idempotent, transactions et consumer en mode read_committed pour fournir une garantie de bout en bout. Un message est traité exactement une fois, peu importe les pannes, redémarrages, élections de leader ou pertes réseau. C’est le mode utilisé en interne par Kafka Streams quand on active processing.guarantee=exactly_once_v2, et c’est la garantie la plus forte que Kafka offre nativement.

Le piège classique du rebalance

Un sujet sous-estimé dans les premiers déploiements : le rebalance du groupe de consommateurs. Quand un consumer rejoint ou quitte le groupe, ou que les heartbeats deviennent erratiques, le group coordinator redistribue les partitions. Pendant cette redistribution, le consumer en cours de traitement peut perdre ses partitions et ne plus pouvoir commiter les offsets qu’il vient de calculer. Au reprise, un autre consumer va lire les mêmes messages depuis le dernier offset commité — donc retraiter.

Deux protections concrètes. D’abord, écouter le rebalance listener et committer les offsets juste avant la perte de partition. Ensuite, depuis Kafka 4.0 et le KIP-848, activer le nouveau consumer rebalance protocol côté broker en positionnant group.protocol=consumer ; il réduit drastiquement la durée des rebalances et supprime le point de synchronisation global. KIP-1071 fait la même chose pour Kafka Streams (GA en 4.2).

consumer.subscribe(List.of("paiements-mobile-money"), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection partitions) {
        // Committer ce qu'on a déjà traité avant de perdre la partition
        consumer.commitSync(currentOffsets);
    }
    @Override
    public void onPartitionsAssigned(Collection partitions) {
        // Initialiser l'état spécifique à la partition si besoin
    }
});

Ce listener s’invoque automatiquement par le client Kafka lors d’un rebalance et donne la dernière fenêtre pour synchroniser l’état applicatif avec les offsets committed. Combiné à un traitement idempotent côté base, il élimine la quasi-totalité des doublons opérationnels.

Headers et tracing distribué

Une bonne pratique en production : propager un correlation ID dans les record headers Kafka pour tracer un événement de bout en bout. Cela facilite le débogage des cas où un message semble dupliqué : on retrouve la trace dans les logs applicatifs avec le même ID. Le header est négligeable en taille (16 à 36 caractères) et n’affecte pas la sérialisation du payload.

ProducerRecord record = new ProducerRecord<>("paiements-mobile-money", key, value);
record.headers().add("correlation-id", UUID.randomUUID().toString().getBytes());
record.headers().add("source", "api-mobile-money-v3".getBytes());
producer.send(record);

Côté consumer, on récupère les headers via rec.headers().lastHeader("correlation-id"). C’est avec ce type de traçabilité qu’on peut auditer après-coup une réclamation client du type « j’ai été débité deux fois » : on remonte l’ID de paiement et on liste tous les événements liés à travers les microservices.

Choix de la clé de partition

Le choix de la clé de message a un impact direct sur la qualité de l’idempotence métier. Une clé bien choisie regroupe les événements qui partagent une cohérence : tous les paiements d’un même client, toutes les commandes d’une même boutique, tous les capteurs d’un même véhicule. Cela garantit l’ordre intra-clé — Kafka préserve l’ordre par partition — et facilite les calculs idempotents côté consumer. Pour les paiements mobiles, la clé naturelle est le numéro du client en format international. Pour des logs applicatifs, c’est souvent l’identifiant de transaction métier. Une mauvaise clé, comme un timestamp ou un identifiant aléatoire, disperse l’ordre et complique tout traitement stateful en aval.


Retour au guide principal : Apache Kafka en production en 2026 : panorama complet.

Service ITSkillsCenter

Site ou application web sur mesure

Conception Pro + Nom de domaine 1 an + Hébergement 1 an + Formation + Support 6 mois. Accès et code livrés. À partir de 350 000 FCFA.

Demander un devis
Publicité