تطوير الويب

Kafka Streams 4.2 في Java: agrégations، نوافذ وjointures

4 دقائق للقراءة

السلسلة: هذا الدرس جزء من سلسلة Kafka 4.2. اقرأ المقال الرئيسي.

Kafka Streams 4.2 المكتبة الرسمية لمعالجة التدفقات: تُدمَج كتبعية Maven في تطبيق Java معياري — بلا cluster منفصل، بلا scheduler، بلا runtime ثالث. الحالة المتراكمة محفوظة محلياً في RocksDB ومُنسَخة عبر topics changelog Kafka. التوازي آلي عبر مجموعات consumers.

المتطلبات

  • Cluster Kafka 4.2 شغّال
  • JDK 21 أو 25
  • Maven 3.9
  • topic paiements-mobile-money مع رسائل JSON
  • 75 دقيقة

الخطوة 1 — Setup المشروع

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>4.2.0</version>
</dependency>

الخطوة 2 — Topology بسيطة: KStream والتصفية

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "paiements-analytics-v1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// منذ 4.2: بروتوكول rebalance خادم GA (KIP-1071)
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> paiements = builder.stream("paiements-mobile-money");

// تصفية الدفعات > 100 000 FCFA وإعادة النشر
paiements
  .filter((key, value) -> {
    JsonNode node = MAPPER.readTree(value);
    return node.get("montant").asLong() > 100_000;
  })
  .to("paiements-gros-montants");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

processing.guarantee=exactly_once_v2 يضمن أن كل رسالة من المصدر تنتج رسالة واحدة فقط على topic الوجهة، حتى عند إعادة التشغيل. group.protocol=streams يُفعّل بروتوكول rebalance الجديد (KIP-1071) — مدّة rebalance تنخفض من 30+ ثانية إلى أقل من ثانية.

الخطوة 3 — Agrégation فتراتية

عدّ الدفعات لكل مرشد كل دقيقة:

KStream<String, String> paiements = builder.stream("paiements-mobile-money");

KTable<Windowed<String>, Long> comptageParMarchand = paiements
  .selectKey((key, value) -> MAPPER.readTree(value).get("marchand_id").asText())
  .groupByKey()
  .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
  .count(Materialized.as("comptage-marchand-1min"));

comptageParMarchand
  .toStream()
  .map((windowedKey, count) -> KeyValue.pair(
    windowedKey.key() + "@" + windowedKey.window().startTime(),
    count.toString()))
  .to("comptage-marchand-output");

TimeWindows.ofSizeWithNoGrace ينشئ نوافذ ثابتة 1 دقيقة بلا فترة سماح للوصول المتأخّر. Materialized.as("comptage-marchand-1min") يُسمّى store RocksDB المحلي — اسم استعلامي لاحقاً.

الخطوة 4 — Jointure stream-table

إثراء الدفعات ببيانات المرشد من جدول مرجعي:

KTable<String, String> marchands = builder.table("marchands-referentiel");
KStream<String, String> paiements = builder.stream("paiements-mobile-money");

KStream<String, String> enrichi = paiements
  .selectKey((k, v) -> MAPPER.readTree(v).get("marchand_id").asText())
  .join(marchands, (paiement, infoMarchand) -> {
    ObjectNode result = MAPPER.createObjectNode();
    result.set("paiement", MAPPER.readTree(paiement));
    result.set("marchand", MAPPER.readTree(infoMarchand));
    return result.toString();
  });

enrichi.to("paiements-enrichis");

Jointure stream-table تطابق كل رسالة paiement مع آخر قيمة معروفة للمرشد المقابل. إن لم يوجد المرشد في الجدول، الرسالة تُتجاهَل (inner join). للاحتفاظ بكل الرسائل، استخدم leftJoin.

الخطوة 5 — Interactive Queries لخدمة REST

قيمة Kafka Streams: الحالة المتراكمة قابلة للاستعلام مباشرة من تطبيقك. لا حاجة لقاعدة بيانات منفصلة.

// Spring Boot endpoint
@GetMapping("/comptage/{marchandId}")
public Long getComptage(@PathVariable String marchandId) {
  ReadOnlyWindowStore<String, Long> store = streams.store(
    StoreQueryParameters.fromNameAndType(
      "comptage-marchand-1min",
      QueryableStoreTypes.windowStore()));
  Instant now = Instant.now();
  WindowStoreIterator<Long> iter = store.fetch(
    marchandId, now.minus(Duration.ofMinutes(1)), now);
  long total = 0;
  while (iter.hasNext()) total += iter.next().value;
  return total;
}

الخطوة 6 — Dead Letter Queue

الرسائل التي تفشل deserialization أو معالجتها لا يجب أن تكسر الـ topology. النمط الموصى به: DeserializationExceptionHandler مخصّص يُعيد توجيه الأخطاء نحو topic DLQ.

public class DLQHandler implements DeserializationExceptionHandler {
  @Override
  public DeserializationHandlerResponse handle(
      ProcessorContext ctx, ConsumerRecord<byte[], byte[]> record, Exception ex) {
    // أرسل إلى DLQ topic
    dlqProducer.send(new ProducerRecord<>("paiements-dlq", record.key(), record.value()));
    return DeserializationHandlerResponse.CONTINUE;
  }
}

props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
  DLQHandler.class.getName());

الخطوة 7 — Scaling أفقي

لتوسيع تطبيق Kafka Streams، شغّل ببساطة نسخاً متعدّدة بنفس application.id. Kafka يُوزّع تلقائياً partitions topic المصدر بين النسخ. على cluster بـ 12 partition، يمكنك تشغيل حتى 12 نسخة متوازية — كل نسخة تأخذ partition.

عدد threads داخلية لكل نسخة يُضبط بـ num.stream.threads=4. على آلة 8 cores، 4 threads يُعطيان توازنا جيداً.

أخطاء شائعة

الخطأ السبب الحل
State store غير مُهيّأ التطبيق لم يكمل الإقلاع انتظر KafkaStreams.State.RUNNING
Disque مشبَّع بـ RocksDB state store ضخم اضبط RocksDBConfigSetter أو reduce cache.max.bytes.buffering
كمون عالي على jointure topic مرجعي غير منسوخ في memory إفرض GlobalKTable بدل KTable إن كان الحجم محدوداً
Rebalance طويل بروتوكول قديم فعّل group.protocol=streams (4.2)
OffsetOutOfRangeException retention topic أقصر من بطء consumer زِد retention أو سرّع المعالجة

متى Kafka Streams ومتى Flink؟

Kafka Streams مثالي للتطبيقات التي تستهلك من Kafka، تعالج، وتعيد النشر على Kafka — بلا dépendance خارجية. Flink يتقدّم على CEP المعقّد، النوافذ المتقدّمة، إدارة زمن الحدث الصارمة، وأحمال > 100 000 حدث/ثانية لكل task. للأغلبية الساحقة من احتياجات PME وstartup، Kafka Streams يكفي بفارق كبير.

مقالات ذات صلة

Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité