تطوير الويب

Kafka 4.2: producers وconsumers idempotents في Java

5 min de lecture

السلسلة: هذا الدرس جزء من سلسلة Kafka 4.2. اقرأ المقال الرئيسي.

الخطأ الأكثر إيلاماً في event streaming ليس فقدان رسالة — Kafka يحمي منه جيداً. إنه الازدواج الصامت: دفعة مُحسَبة مرتين، email مُرسَل مزدوجاً، طلب مُكرَّر. منذ Kafka 3.0 (سبتمبر 2021)، producer idempotent مفعَّل افتراضياً عبر KIP-679، والإصدار 4.2 عزّز السلسلة بإدارة أفضل للأخطاء جانب broker.

المتطلبات

  • Cluster Kafka 4.2 شغّال
  • JDK 21 أو 25 مثبَّت محلياً
  • Maven 3.9 أو Gradle 8.10
  • أساسيات Java
  • 75 دقيقة

الخطوة 1 — فهم idempotence جانب producer

حين ينشر Producer Kafka رسالة، ينتظر إقراراً من broker. إن لم يصل في الوقت — قطع شبكي، GC pause، انتخاب broker leader — producer يُعيد الإرسال آلياً. بلا حماية، هذا السلوك يخلق ازدواجيات.

producer idempotent يحلّ هذا بإسناد Producer ID (PID) فريد لكل producer وترقيم كل رسالة بـ sequence number رتيب لكل partition. broker يحفظ حالة لكل (PID، partition) ويرفض صامتاً أي ازدواج تسلسل. لا تدخّل تطبيقي مطلوب.

ثلاث خصائص مُقيَّدة آلياً حين idempotence نشطة: acks=all، max.in.flight.requests.per.connection ≤ 5، وretries > 0.

الخطوة 2 — كتابة producer Java idempotent

<!-- pom.xml -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>4.2.0</version>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.21.3</version>
</dependency>
package io.itskillscenter.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Map;
import java.util.Properties;

public class PaymentProducer {
  private static final ObjectMapper MAPPER = new ObjectMapper();

  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092,kafka2.example.com:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // idempotence — مفعَّلة افتراضياً منذ 3.0
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

    // تحسينات معيارية
    props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 32 * 1024);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");

    try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
      for (int i = 0; i < 100; i++) {
        Map<String, Object> payment = Map.of(
          "id", "PAY-" + i,
          "client", "+966501234" + (000 + i),
          "montant", 5000 + i * 250,
          "moyen", i % 2 == 0 ? "Mada" : "STC Pay"
        );
        String key = (String) payment.get("client");
        String value = MAPPER.writeValueAsString(payment);
        ProducerRecord<String, String> record =
          new ProducerRecord<>("paiements-mobile-money", key, value);

        producer.send(record, (meta, ex) -> {
          if (ex != null) System.err.println("Echec: " + ex.getMessage());
          else System.out.println("Envoye partition=" + meta.partition() + " offset=" + meta.offset());
        });
      }
      producer.flush();
    }
  }
}

try-with-resources جوهري: يضمن استدعاء close() حتى عند استثناء، ما يفرض flush للـ batches في الانتظار وتحرير PID لدى broker. linger.ms=20 وضغط zstd يضيفان ربح throughput معتبراً — نمطياً 30-50% توفير bande passante على JSON.

الخطوة 3 — فهم idempotence جانب consumer

consumer Kafka افتراضياً ليس idempotent. التسلسل الكلاسيكي: consumer يعالج 100 رسالة، يُعيد commit لـ 50، ثم ينهار قبل commit الـ 50 الباقية. عند إعادة التشغيل، يستأنف من offset 50 ويُعيد معالجة الـ 50 المتبقية — التي عُولِجت سلفاً. الازدواج تطبيقي وغير مرئي لـ Kafka.

ثلاثة أنماط تعالج هذا. idempotence تجارية: نضمن أن كتابة نفس الدفعة مرتين تنتج سطراً واحداً، عبر قيد فريد أو upsert. transactions Kafka لـ exactly-once طرف لطرف. offset store خارجي transactionnel.

الخطوة 4 — تطبيق consumer مع idempotence تجارية

package io.itskillscenter.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class IdempotentPaymentConsumer {
  private static final ObjectMapper MAPPER = new ObjectMapper();

  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "paiements-comptable-v1");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);

    DataSource ds = buildDataSource(); // HikariCP موصى به

    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
      consumer.subscribe(Collections.singletonList("paiements-mobile-money"));
      while (!Thread.currentThread().isInterrupted()) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2));
        if (records.isEmpty()) continue;

        try (Connection conn = ds.getConnection()) {
          conn.setAutoCommit(false);
          try (PreparedStatement ps = conn.prepareStatement(
              "INSERT INTO paiements (id, client, montant, moyen) " +
              "VALUES (?, ?, ?, ?) ON CONFLICT (id) DO NOTHING")) {
            for (ConsumerRecord<String, String> rec : records) {
              JsonNode p = MAPPER.readTree(rec.value());
              ps.setString(1, p.get("id").asText());
              ps.setString(2, p.get("client").asText());
              ps.setInt(3, p.get("montant").asInt());
              ps.setString(4, p.get("moyen").asText());
              ps.addBatch();
            }
            ps.executeBatch();
          }
          conn.commit();
        }
        consumer.commitSync();
      }
    }
  }
}

نقطتان رئيسيتان. enable.auto.commit=false وcommitSync() يدوي: نتحكّم بدقة متى تُنشر offsets. ON CONFLICT (id) DO NOTHING: إن أُعيد حقن نفس الدفعة، تُتجاهَل صامتة. isolation.level=read_committed مهمّ إن استخدم منتجون آخرون transactions.

الخطوة 5 — Kafka transactions لـ exactly-once حقيقي

حين تكون idempotence التجارية مستحيلة — مثلاً workflow يقرأ topic، يحسب agrégat، وينشر على topic آخر — Kafka يقترح transactions. producer يفتح transaction، يكتب على N topics، يُعلّم offsets المصدر مُستهلَكة، ثم يلتزم بالمجموعة ذرّياً.

Properties txProps = new Properties();
txProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
txProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-aggregateur-v1");
txProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
txProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(txProps);
producer.initTransactions();

try {
  producer.beginTransaction();
  producer.send(new ProducerRecord<>("paiements-aggregates", "Mada", "{...}"));
  producer.send(new ProducerRecord<>("paiements-aggregates", "STC", "{...}"));
  producer.sendOffsetsToTransaction(currentOffsets, consumer.groupMetadata());
  producer.commitTransaction();
} catch (Exception e) {
  producer.abortTransaction();
  throw e;
}

transactional.id يجب أن يكون مستقرّاً وفريداً لكل نسخة تطبيقية. إن أطلقت pod Kubernetes ثانياً بنفس transactional.id، الثاني يُلغي آلياً transactions الأول — آلية fencing التي تمنع zombies.

الخطوة 6 — قياس ورصد

ثلاثة مقاييس تُرصَد بأولوية مع exporter JMX وPrometheus. kafka_consumer_records_consumed_total يعطي throughput الفعلي. kafka_consumer_lag_sum يكشف consumers المتأخرة. kafka_producer_record_error_total يرفع أخطاء الإرسال غير القابلة لإعادة الإرسال.

إشارة نجاح واضحة: مع producer idempotent وconsumer مكتوب كالخطوة 4، يجب أن تستطيع قتل consumer وحشاً، إعادة تشغيله، ومعاينة أنه لم تظهر أي سطر مكرَّر. اختبار بسيط: kill -9 على consumer في عُمق poll، إعادة تشغيل، ثم SELECT COUNT(*), COUNT(DISTINCT id) FROM paiements — العدّادان يجب أن يكونا متساويين تماماً.

أخطاء شائعة

الخطأ السبب الحل
OutOfOrderSequenceException Producer مُعاد بـ transactional.id مكرَّر معرّف فريد لكل نسخة
ازدواجيات في القاعدة رغم idempotence قيد فريد مفقود أضف PRIMARY KEY (id) أو فهرس فريد
InvalidTxnStateException commitTransaction() خارج transaction مفتوح غلّف كل كتابة بـ beginTransaction()
Consumer يُعيد القراءة في حلقة commit يدوي منسي فعّل enable.auto.commit أو استدع commitSync()
Lag لا ينخفض أبداً max.poll.records ضعيف أو معالجة بطيئة profile المعالجة وزِد max.poll.records

متى نستخدم ماذا — قرار سريع

إن كتب تطبيقك في قاعدة علائقية أو نظام يدعم upserts طبيعياً، استخدم idempotence تجارية: بسيطة، مُؤدِّية ومقروءة. الكلفة التشغيلية منخفضة والـ throughput يبقى أقصى.

إن كان تطبيقك Kafka-to-Kafka صرفاً — مثل agrégateur يقرأ تدفق أحداث، يحسب، يُعيد النشر — transactions لا غنى عنها. تُدخل كموناً إضافياً 10-30 مللي ثانية لكل batch وكلفة CPU 5-10% مقارنة بـ at-least-once.

فخ rebalance الكلاسيكي

حين يلتحق consumer أو يغادر المجموعة، group coordinator يُعيد توزيع partitions. خلال هذا التوزيع، consumer قيد المعالجة قد يفقد partitions ولا يستطيع commit offsets المحسوبة لتوّه. عند الاستئناف، consumer آخر سيقرأ نفس الرسائل من آخر offset مُلتزَم.

consumer.subscribe(List.of("paiements-mobile-money"), new ConsumerRebalanceListener() {
  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    // commit ما عولج قبل فقد partition
    consumer.commitSync(currentOffsets);
  }
  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
});

منذ Kafka 4.0 وKIP-848، تفعيل بروتوكول rebalance الجديد جانب broker بضبط group.protocol=consumer يُقلّص بشكل جذري مدة rebalances.

Headers وtracing موزَّع

ممارسة جيدة في الإنتاج: نشر correlation ID في headers record Kafka لتتبّع حدث طرفاً لطرف.

ProducerRecord<String, String> record = new ProducerRecord<>("paiements-mobile-money", key, value);
record.headers().add("correlation-id", UUID.randomUUID().toString().getBytes());
record.headers().add("source", "api-mobile-money-v3".getBytes());
producer.send(record);

اختيار مفتاح partition

اختيار مفتاح الرسالة له أثر مباشر على جودة idempotence التجارية. مفتاح حسن الاختيار يجمع الأحداث ذات التماسك: كل دفعات نفس العميل، كل طلبات نفس المتجر. هذا يضمن الترتيب داخل المفتاح — Kafka يحفظ الترتيب لكل partition — ويُيسّر حسابات idempotent جانب consumer.

مقالات ذات صلة

Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité