تطوير الويب

Confluent Schema Registry وAvro مع Kafka 4.2: درس Java

6 دقائق للقراءة

السلسلة: هذا الدرس جزء من سلسلة Kafka 4.2. اقرأ المقال الرئيسي.

حين يكبر cluster Kafka، أول أعراض الألم ليست الكمون أو الـ throughput — إنها عقد البيانات. فريق يُضيف حقلاً لحدث، ينسى إخطار المستهلكين، ونصف SI ينهار. Schema Registry يحلّ هذا بفرض انضباط: كل producer يجب أن يُسجّل schema رسائله، وكل consumer يستلم ضمان أن schema سيكون متوافقاً.

المتطلبات

  • Cluster Kafka 4.2 شغّال
  • Docker 27+ أو Java 21 لتنفيذ Confluent Schema Registry
  • Maven 3.9 أو Gradle 8.10 لمشروع Java
  • 80 دقيقة

الخطوة 1 — فهم Avro وعقد البيانات

Avro صيغة تسلسل ثنائية مدمجة. تختلف عن JSON بثلاث خصائص: ثنائية (الأرقام والـ booleans تأخذ بعض البتات لا octets ASCII)، typée بصرامة، وتفصل schema عن البيانات (schema لا يُكرَّر مع كل رسالة). لرسالة paiement موبايل نمطية، حجم Avro 40-60% أقل من JSON المكافئ.

لكن الفائدة الحقيقية لـ Avro ليست الحجم — إنها التوافق في الزمن. ثلاث أوضاع توافق: Backward (schema جديد يقرأ رسائل قديمة)، Forward (schema قديم يقرأ رسائل جديدة)، Full (الاثنان). Schema Registry يرفض التطوّرات التي تكسر التوافق المُعد.

الخطوة 2 — إطلاق Confluent Schema Registry

# docker-compose.yml
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:8.2.0
    container_name: schema-registry
    restart: unless-stopped
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1.example.com:9092,PLAINTEXT://kafka2.example.com:9092
      SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: BACKWARD
docker compose up -d schema-registry
curl -s http://localhost:8081/subjects
# المخرَج: [] (لا سبجكت بعد)

Schema Registry خدمة stateless تخزّن بياناتها في topic داخلي للـ cluster Kafka، مسمّى افتراضياً _schemas. الحماية بنسخ topic (factor 3 افتراضي).

الخطوة 3 — تعريف schema Avro للـ paiements

// src/main/avro/Payment.avsc
{
  "namespace": "io.itskillscenter.kafka.avro",
  "type": "record",
  "name": "Payment",
  "fields": [
    { "name": "id", "type": "string", "doc": "Identifiant unique" },
    { "name": "client", "type": "string", "doc": "Numero international" },
    { "name": "montant", "type": "long", "doc": "Montant en centimes" },
    { "name": "moyen", "type": { "type": "enum", "name": "MoyenPaiement",
        "symbols": ["MADA", "STC_PAY", "VODAFONE_CASH", "ORANGE_MAROC", "BENEFIT_BAHRAIN"] } },
    { "name": "timestamp", "type": { "type": "long", "logicalType": "timestamp-millis" } },
    { "name": "marchand_id", "type": ["null", "string"], "default": null }
  ]
}

التفاصيل: "type": "long" مع logicalType: timestamp-millis يدلّ Avro معاملة الحقل كتاريخ. الـ union ["null", "string"] مع default: null يجعل الحقل اختيارياً ومستقرّاً للتطوّرات. enum على وسائل الدفع يتجنّب الأخطاء الإملائية.

الخطوة 4 — توليد classes Java من schema

<!-- pom.xml -->
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-avro-serializer</artifactId>
  <version>8.2.0</version>
</dependency>
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.12.0</version>
</dependency>

<build>
  <plugins>
    <plugin>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-maven-plugin</artifactId>
      <version>1.12.0</version>
      <executions>
        <execution>
          <phase>generate-sources</phase>
          <goals><goal>schema</goal></goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

<repositories>
  <repository>
    <id>confluent</id>
    <url>https://packages.confluent.io/maven/</url>
  </repository>
</repositories>

mvn compile يُولّد ملف Payment.java في target/generated-sources/avro. الـ class immutable وthread-safe، تكشف builder fluent يُصادق آلياً على التوافق مع schema.

الخطوة 5 — Producer Avro مع تسجيل آلي

الـ serializer Confluent يدير كل شيء خلفياً: يُسجّل schema لدى Schema Registry عند أول إرسال، يستردّ المعرّف، ويُسبّق كل رسالة بهذا المعرّف (4 octets) + الـ magic byte (1 octet).

package io.itskillscenter.kafka.avro;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;

public class AvroPaymentProducer {
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    props.put("schema.registry.url", "http://localhost:8081");

    try (KafkaProducer<String, Payment> producer = new KafkaProducer<>(props)) {
      Payment paiement = Payment.newBuilder()
        .setId("PAY-2026-001")
        .setClient("+966501234567")
        .setMontant(15000L)
        .setMoyen(MoyenPaiement.MADA)
        .setTimestamp(Instant.now().toEpochMilli())
        .setMarchandId("BOUTIQUE-RIYADH-42")
        .build();

      ProducerRecord<String, Payment> record =
        new ProducerRecord<>("paiements-avro", paiement.getClient().toString(), paiement);
      producer.send(record).get();
      System.out.println("Publie: " + paiement.getId());
    } catch (Exception e) { e.printStackTrace(); }
  }
}

عند أول send، client Confluent يستدعي POST /subjects/paiements-avro-value/versions على Registry، الذي يُرجع {"id": 1}. كل الرسائل التالية تستخدم نفس المعرّف 1.

الخطوة 6 — Consumer Avro وdésérialisation typée

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "comptable-paiements-v1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "true");

try (KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
  consumer.subscribe(List.of("paiements-avro"));
  while (true) {
    ConsumerRecords<String, Payment> recs = consumer.poll(Duration.ofSeconds(1));
    for (ConsumerRecord<String, Payment> r : recs) {
      Payment p = r.value();
      System.out.printf("%s -> %d via %s%n",
        p.getId(), p.getMontant(), p.getMoyen());
    }
  }
}

specific.avro.reader=true هو ما يصنع الفرق بين كود نظيف وقاموس حقول مسمّاة للمعالجة يدوياً.

الخطوة 7 — تطوير schema بلا كسر المستهلكين

بعد 6 أشهر، الأعمال تطلب إضافة حقل devise. القاعدة البسيطة: حقل جديد يجب أن يكون له قيمة افتراضية، ما يجعله backward-compatible.

// Payment.avsc — الإصدار 2
{
  ...
  "fields": [
    ... // الحقول الموجودة
    { "name": "devise", "type": "string", "default": "SAR" }
  ]
}

default: "SAR" يُخبر schema reader: إن وصلت رسالة بلا هذا الحقل، اعتبرها SAR. Registry يقبل هذا التطوير. حذف حقل إلزامي أو تغيير نوعه يُسبّب رفض HTTP 409 «Schema being registered is incompatible».

أخطاء شائعة

الخطأ السبب الحل
HTTP 409 عند التسجيل تطوير غير متوافق أضف default أو غيّر سياسة السبجكت إلى NONE
SerializationException: Unknown magic byte topic يخلط Avro وJSON أنشئ topic مخصّصاً لـ Avro
RestClientException: 401 Registry محمي بـ basic auth أضف basic.auth.credentials.source=USER_INFO
Topic موجود لكن سبجكت غائب Producer لم ينشر أبداً افرض أول إرسال أو سجّل schema يدوياً عبر curl
كمون مرتفع عند الإقلاع cache client Schema Registry بارد فعّل auto.register.schemas=false في prod وحمّل schemas مسبقاً

بدائل مفتوحة: Karapace وApicurio

Confluent Schema Registry يفرض Confluent Community License. Karapace، من Aiven، drop-in بـ Python متوافق API حتى الإصدار 6.1.1. أقل استهلاكاً وأسرع. Apicurio Registry، من Red Hat، أعمّ ويُديم البيانات في PostgreSQL.

Production: ما يجب تعطيله

في dev، auto.register.schemas=true عملي. في الإنتاج، هذا بالضبط ما لا يجب فعله. الممارسة الموصى بها auto.register.schemas=false جانب producer وworkflow تسجيل صريح.

# .github/workflows/schema-check.yml
on: pull_request
jobs:
  check:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - name: Test compatibility
        run: |
          curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
               --data @schema-payload.json \
               https://schema-registry.example.com/compatibility/subjects/paiements-avro-value/versions/latest

الإرجاع {"is_compatible": true} يدلّ على أن الإصدار الجديد قابل للنشر بلا كسر المستهلكين. false يحجب merge آلياً. هذه الممارسة تقتل وحدها 90% من حوادث الإنتاج المرتبطة بـ schemas.

مقالات ذات صلة

Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité