تطوير الويب

Pipeline CDC PostgreSQL مع Debezium 3.4 وKafka Connect

4 دقائق للقراءة

السلسلة: هذا الدرس جزء من سلسلة Kafka 4.2. اقرأ المقال الرئيسي.

Change Data Capture (CDC) يُحوّل قاعدة PostgreSQL إلى مصدر أحداث: كل INSERT، UPDATE، DELETE في القاعدة يصير رسالة Kafka، بلا لمس كود التطبيق. Debezium 3.4.0.Final (16 ديسمبر 2025) المرجع — يدعم PostgreSQL 14-18، MySQL، MongoDB، SQL Server وOracle. الكمون النمطي بين INSERT وارتفاع الرسالة على Kafka: 50-200 مللي ثانية.

المتطلبات

  • Cluster Kafka 4.2 شغّال
  • PostgreSQL 16 أو 17 مع wal_level=logical
  • Docker لتنفيذ Kafka Connect
  • 60 دقيقة

الخطوة 1 — ضبط PostgreSQL للنسخ المنطقي

# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
-- إنشاء role replication + publication
CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD 'secret';
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER TABLE paiements REPLICA IDENTITY FULL;
CREATE PUBLICATION dbz_publication FOR TABLE paiements;

REPLICA IDENTITY FULL يجعل PostgreSQL يكتب القيم القديمة في WAL عند UPDATE/DELETE، ما يسمح بحقل before كامل في الحدث Debezium.

الخطوة 2 — إطلاق Kafka Connect

# docker-compose.yml
services:
  connect:
    image: confluentinc/cp-kafka-connect:8.2.0
    ports: ["8083:8083"]
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka1.example.com:9092
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
    volumes:
      - ./plugins:/usr/share/confluent-hub-components

نزّل plugin Debezium في ./plugins/debezium-connector-postgres من debezium.io/releases/3.4.

الخطوة 3 — إنشاء connecteur Debezium

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "postgres-paiements",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "debezium",
      "database.password": "secret",
      "database.dbname": "appdb",
      "topic.prefix": "dbz",
      "plugin.name": "pgoutput",
      "publication.name": "dbz_publication",
      "slot.name": "debezium_slot",
      "table.include.list": "public.paiements",
      "snapshot.mode": "initial",
      "tombstones.on.delete": "true"
    }
  }'

plugin.name=pgoutput الخيار الموصى به في 2026 — مدمج في PostgreSQL منذ 10، أداء أفضل من wal2json. snapshot.mode=initial يستخرج كل البيانات الموجودة قبل الانتقال إلى streaming WAL. الـ topics المُنشأة تتبع نمط dbz.public.paiements.

الخطوة 4 — قراءة الأحداث CDC

kafka-console-consumer.sh \
  --bootstrap-server kafka1.example.com:9092 \
  --topic dbz.public.paiements \
  --from-beginning | jq

كل حدث Debezium له بنية موحَّدة: before (الحالة قبل، null لـ INSERT)، after (الحالة بعد، null لـ DELETE)، source (metadata قاعدة)، op (c=create، u=update، d=delete، r=read snapshot).

{
  "before": null,
  "after": {
    "id": "PAY-001",
    "client": "+221771234567",
    "montant": 15000,
    "moyen": "WAVE"
  },
  "source": { "db": "appdb", "table": "paiements", "ts_ms": 1737205200000 },
  "op": "c",
  "ts_ms": 1737205200042
}

الخطوة 5 — SMT (Single Message Transforms) لإخفاء الحساس

SMT تطبّق تحويلات بسيطة على الرسائل خلال مرورها في Connect — بلا كتابة كود. مثال: إخفاء رقم العميل الكامل قبل الإرسال.

"transforms": "MaskClient",
"transforms.MaskClient.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskClient.fields": "client",
"transforms.MaskClient.replacement": "***MASKED***"

الخطوة 6 — مراقبة slot النسخ

SELECT slot_name, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;

إن نما lag بلا توقّف، إما connecteur متوقّف أو حركة WAL أسرع من الاستهلاك. خطر كبير: WAL يتراكم على القرص ولا يُحرَّر طالما slot لم يستهلكه.

الخطوة 7 — استهلاك CDC في تطبيق Java

// مثال: مزامنة Elasticsearch
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sync-elastic");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");

KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("dbz.public.paiements"));

while (true) {
  for (ConsumerRecord<String, GenericRecord> rec : consumer.poll(Duration.ofSeconds(1))) {
    String op = rec.value().get("op").toString();
    GenericRecord after = (GenericRecord) rec.value().get("after");
    switch (op) {
      case "c": case "u": elasticClient.index(after); break;
      case "d": elasticClient.delete(rec.key()); break;
    }
  }
  consumer.commitSync();
}

أخطاء شائعة

الخطأ السبب الحل
logical decoding requires wal_level=logical إعداد PostgreSQL غير ملائم عدّل postgresql.conf وأعد التشغيل
WAL ينمو بلا حدّ slot غير مُستهلَك (connecteur متوقّف) أزل slot المسروق أو أعد تشغيل connecteur
أحداث before فارغة على UPDATE REPLICA IDENTITY DEFAULT ALTER TABLE ... REPLICA IDENTITY FULL
Snapshot يأخذ ساعات جدول ضخم بلا snapshot.fetch.size اضبط snapshot.fetch.size=10000
كمون CDC عالي Connect حمل وحيد زِد عدد tasks أو افصل Connect على cluster مخصّص

حالات استخدام شائعة

  • مزامنة فهارس بحث: PostgreSQL ← Elasticsearch/Meilisearch بلا كود تطبيقي
  • تحديث cache: PostgreSQL ← Redis، invalidation آلية
  • Lake data: PostgreSQL ← Iceberg/Delta على S3 للتحليلات
  • تكامل microservices: نقل تدريجي من monolithe بلا انقطاع
  • Audit log: تاريخ كامل للتغييرات على بنود حساسة

مقالات ذات صلة

Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité