السلسلة: هذا الدرس جزء من سلسلة Kafka 4.2. اقرأ المقال الرئيسي.
حين يكبر cluster Kafka، أول أعراض الألم ليست الكمون أو الـ throughput — إنها عقد البيانات. فريق يُضيف حقلاً لحدث، ينسى إخطار المستهلكين، ونصف SI ينهار. Schema Registry يحلّ هذا بفرض انضباط: كل producer يجب أن يُسجّل schema رسائله، وكل consumer يستلم ضمان أن schema سيكون متوافقاً.
المتطلبات
- Cluster Kafka 4.2 شغّال
- Docker 27+ أو Java 21 لتنفيذ Confluent Schema Registry
- Maven 3.9 أو Gradle 8.10 لمشروع Java
- 80 دقيقة
الخطوة 1 — فهم Avro وعقد البيانات
Avro صيغة تسلسل ثنائية مدمجة. تختلف عن JSON بثلاث خصائص: ثنائية (الأرقام والـ booleans تأخذ بعض البتات لا octets ASCII)، typée بصرامة، وتفصل schema عن البيانات (schema لا يُكرَّر مع كل رسالة). لرسالة paiement موبايل نمطية، حجم Avro 40-60% أقل من JSON المكافئ.
لكن الفائدة الحقيقية لـ Avro ليست الحجم — إنها التوافق في الزمن. ثلاث أوضاع توافق: Backward (schema جديد يقرأ رسائل قديمة)، Forward (schema قديم يقرأ رسائل جديدة)، Full (الاثنان). Schema Registry يرفض التطوّرات التي تكسر التوافق المُعد.
الخطوة 2 — إطلاق Confluent Schema Registry
# docker-compose.yml
services:
schema-registry:
image: confluentinc/cp-schema-registry:8.2.0
container_name: schema-registry
restart: unless-stopped
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1.example.com:9092,PLAINTEXT://kafka2.example.com:9092
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: BACKWARD
docker compose up -d schema-registry
curl -s http://localhost:8081/subjects
# المخرَج: [] (لا سبجكت بعد)
Schema Registry خدمة stateless تخزّن بياناتها في topic داخلي للـ cluster Kafka، مسمّى افتراضياً _schemas. الحماية بنسخ topic (factor 3 افتراضي).
الخطوة 3 — تعريف schema Avro للـ paiements
// src/main/avro/Payment.avsc
{
"namespace": "io.itskillscenter.kafka.avro",
"type": "record",
"name": "Payment",
"fields": [
{ "name": "id", "type": "string", "doc": "Identifiant unique" },
{ "name": "client", "type": "string", "doc": "Numero international" },
{ "name": "montant", "type": "long", "doc": "Montant en centimes" },
{ "name": "moyen", "type": { "type": "enum", "name": "MoyenPaiement",
"symbols": ["MADA", "STC_PAY", "VODAFONE_CASH", "ORANGE_MAROC", "BENEFIT_BAHRAIN"] } },
{ "name": "timestamp", "type": { "type": "long", "logicalType": "timestamp-millis" } },
{ "name": "marchand_id", "type": ["null", "string"], "default": null }
]
}
التفاصيل: "type": "long" مع logicalType: timestamp-millis يدلّ Avro معاملة الحقل كتاريخ. الـ union ["null", "string"] مع default: null يجعل الحقل اختيارياً ومستقرّاً للتطوّرات. enum على وسائل الدفع يتجنّب الأخطاء الإملائية.
الخطوة 4 — توليد classes Java من schema
<!-- pom.xml -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>8.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.12.0</version>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.12.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals><goal>schema</goal></goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
mvn compile يُولّد ملف Payment.java في target/generated-sources/avro. الـ class immutable وthread-safe، تكشف builder fluent يُصادق آلياً على التوافق مع schema.
الخطوة 5 — Producer Avro مع تسجيل آلي
الـ serializer Confluent يدير كل شيء خلفياً: يُسجّل schema لدى Schema Registry عند أول إرسال، يستردّ المعرّف، ويُسبّق كل رسالة بهذا المعرّف (4 octets) + الـ magic byte (1 octet).
package io.itskillscenter.kafka.avro;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Instant;
import java.util.Properties;
public class AvroPaymentProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
try (KafkaProducer<String, Payment> producer = new KafkaProducer<>(props)) {
Payment paiement = Payment.newBuilder()
.setId("PAY-2026-001")
.setClient("+966501234567")
.setMontant(15000L)
.setMoyen(MoyenPaiement.MADA)
.setTimestamp(Instant.now().toEpochMilli())
.setMarchandId("BOUTIQUE-RIYADH-42")
.build();
ProducerRecord<String, Payment> record =
new ProducerRecord<>("paiements-avro", paiement.getClient().toString(), paiement);
producer.send(record).get();
System.out.println("Publie: " + paiement.getId());
} catch (Exception e) { e.printStackTrace(); }
}
}
عند أول send، client Confluent يستدعي POST /subjects/paiements-avro-value/versions على Registry، الذي يُرجع {"id": 1}. كل الرسائل التالية تستخدم نفس المعرّف 1.
الخطوة 6 — Consumer Avro وdésérialisation typée
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "comptable-paiements-v1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
props.put("specific.avro.reader", "true");
try (KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("paiements-avro"));
while (true) {
ConsumerRecords<String, Payment> recs = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, Payment> r : recs) {
Payment p = r.value();
System.out.printf("%s -> %d via %s%n",
p.getId(), p.getMontant(), p.getMoyen());
}
}
}
specific.avro.reader=true هو ما يصنع الفرق بين كود نظيف وقاموس حقول مسمّاة للمعالجة يدوياً.
الخطوة 7 — تطوير schema بلا كسر المستهلكين
بعد 6 أشهر، الأعمال تطلب إضافة حقل devise. القاعدة البسيطة: حقل جديد يجب أن يكون له قيمة افتراضية، ما يجعله backward-compatible.
// Payment.avsc — الإصدار 2
{
...
"fields": [
... // الحقول الموجودة
{ "name": "devise", "type": "string", "default": "SAR" }
]
}
default: "SAR" يُخبر schema reader: إن وصلت رسالة بلا هذا الحقل، اعتبرها SAR. Registry يقبل هذا التطوير. حذف حقل إلزامي أو تغيير نوعه يُسبّب رفض HTTP 409 «Schema being registered is incompatible».
أخطاء شائعة
| الخطأ | السبب | الحل |
|---|---|---|
| HTTP 409 عند التسجيل | تطوير غير متوافق | أضف default أو غيّر سياسة السبجكت إلى NONE |
SerializationException: Unknown magic byte |
topic يخلط Avro وJSON | أنشئ topic مخصّصاً لـ Avro |
RestClientException: 401 |
Registry محمي بـ basic auth | أضف basic.auth.credentials.source=USER_INFO |
| Topic موجود لكن سبجكت غائب | Producer لم ينشر أبداً | افرض أول إرسال أو سجّل schema يدوياً عبر curl |
| كمون مرتفع عند الإقلاع | cache client Schema Registry بارد | فعّل auto.register.schemas=false في prod وحمّل schemas مسبقاً |
بدائل مفتوحة: Karapace وApicurio
Confluent Schema Registry يفرض Confluent Community License. Karapace، من Aiven، drop-in بـ Python متوافق API حتى الإصدار 6.1.1. أقل استهلاكاً وأسرع. Apicurio Registry، من Red Hat، أعمّ ويُديم البيانات في PostgreSQL.
Production: ما يجب تعطيله
في dev، auto.register.schemas=true عملي. في الإنتاج، هذا بالضبط ما لا يجب فعله. الممارسة الموصى بها auto.register.schemas=false جانب producer وworkflow تسجيل صريح.
# .github/workflows/schema-check.yml
on: pull_request
jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Test compatibility
run: |
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @schema-payload.json \
https://schema-registry.example.com/compatibility/subjects/paiements-avro-value/versions/latest
الإرجاع {"is_compatible": true} يدلّ على أن الإصدار الجديد قابل للنشر بلا كسر المستهلكين. false يحجب merge آلياً. هذه الممارسة تقتل وحدها 90% من حوادث الإنتاج المرتبطة بـ schemas.