السلسلة: هذا الدرس جزء من سلسلة Kafka 4.2. اقرأ المقال الرئيسي.
Change Data Capture (CDC) يُحوّل قاعدة PostgreSQL إلى مصدر أحداث: كل INSERT، UPDATE، DELETE في القاعدة يصير رسالة Kafka، بلا لمس كود التطبيق. Debezium 3.4.0.Final (16 ديسمبر 2025) المرجع — يدعم PostgreSQL 14-18، MySQL، MongoDB، SQL Server وOracle. الكمون النمطي بين INSERT وارتفاع الرسالة على Kafka: 50-200 مللي ثانية.
المتطلبات
- Cluster Kafka 4.2 شغّال
- PostgreSQL 16 أو 17 مع
wal_level=logical - Docker لتنفيذ Kafka Connect
- 60 دقيقة
الخطوة 1 — ضبط PostgreSQL للنسخ المنطقي
# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
-- إنشاء role replication + publication
CREATE ROLE debezium WITH REPLICATION LOGIN PASSWORD 'secret';
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER TABLE paiements REPLICA IDENTITY FULL;
CREATE PUBLICATION dbz_publication FOR TABLE paiements;
REPLICA IDENTITY FULL يجعل PostgreSQL يكتب القيم القديمة في WAL عند UPDATE/DELETE، ما يسمح بحقل before كامل في الحدث Debezium.
الخطوة 2 — إطلاق Kafka Connect
# docker-compose.yml
services:
connect:
image: confluentinc/cp-kafka-connect:8.2.0
ports: ["8083:8083"]
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka1.example.com:9092
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
volumes:
- ./plugins:/usr/share/confluent-hub-components
نزّل plugin Debezium في ./plugins/debezium-connector-postgres من debezium.io/releases/3.4.
الخطوة 3 — إنشاء connecteur Debezium
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "postgres-paiements",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "appdb",
"topic.prefix": "dbz",
"plugin.name": "pgoutput",
"publication.name": "dbz_publication",
"slot.name": "debezium_slot",
"table.include.list": "public.paiements",
"snapshot.mode": "initial",
"tombstones.on.delete": "true"
}
}'
plugin.name=pgoutput الخيار الموصى به في 2026 — مدمج في PostgreSQL منذ 10، أداء أفضل من wal2json. snapshot.mode=initial يستخرج كل البيانات الموجودة قبل الانتقال إلى streaming WAL. الـ topics المُنشأة تتبع نمط dbz.public.paiements.
الخطوة 4 — قراءة الأحداث CDC
kafka-console-consumer.sh \
--bootstrap-server kafka1.example.com:9092 \
--topic dbz.public.paiements \
--from-beginning | jq
كل حدث Debezium له بنية موحَّدة: before (الحالة قبل، null لـ INSERT)، after (الحالة بعد، null لـ DELETE)، source (metadata قاعدة)، op (c=create، u=update، d=delete، r=read snapshot).
{
"before": null,
"after": {
"id": "PAY-001",
"client": "+221771234567",
"montant": 15000,
"moyen": "WAVE"
},
"source": { "db": "appdb", "table": "paiements", "ts_ms": 1737205200000 },
"op": "c",
"ts_ms": 1737205200042
}
الخطوة 5 — SMT (Single Message Transforms) لإخفاء الحساس
SMT تطبّق تحويلات بسيطة على الرسائل خلال مرورها في Connect — بلا كتابة كود. مثال: إخفاء رقم العميل الكامل قبل الإرسال.
"transforms": "MaskClient",
"transforms.MaskClient.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.MaskClient.fields": "client",
"transforms.MaskClient.replacement": "***MASKED***"
الخطوة 6 — مراقبة slot النسخ
SELECT slot_name, active, restart_lsn, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;
إن نما lag بلا توقّف، إما connecteur متوقّف أو حركة WAL أسرع من الاستهلاك. خطر كبير: WAL يتراكم على القرص ولا يُحرَّر طالما slot لم يستهلكه.
الخطوة 7 — استهلاك CDC في تطبيق Java
// مثال: مزامنة Elasticsearch
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1.example.com:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sync-elastic");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put("schema.registry.url", "http://localhost:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("dbz.public.paiements"));
while (true) {
for (ConsumerRecord<String, GenericRecord> rec : consumer.poll(Duration.ofSeconds(1))) {
String op = rec.value().get("op").toString();
GenericRecord after = (GenericRecord) rec.value().get("after");
switch (op) {
case "c": case "u": elasticClient.index(after); break;
case "d": elasticClient.delete(rec.key()); break;
}
}
consumer.commitSync();
}
أخطاء شائعة
| الخطأ | السبب | الحل |
|---|---|---|
logical decoding requires wal_level=logical |
إعداد PostgreSQL غير ملائم | عدّل postgresql.conf وأعد التشغيل |
| WAL ينمو بلا حدّ | slot غير مُستهلَك (connecteur متوقّف) | أزل slot المسروق أو أعد تشغيل connecteur |
أحداث before فارغة على UPDATE |
REPLICA IDENTITY DEFAULT |
ALTER TABLE ... REPLICA IDENTITY FULL |
| Snapshot يأخذ ساعات | جدول ضخم بلا snapshot.fetch.size |
اضبط snapshot.fetch.size=10000 |
| كمون CDC عالي | Connect حمل وحيد | زِد عدد tasks أو افصل Connect على cluster مخصّص |
حالات استخدام شائعة
- مزامنة فهارس بحث: PostgreSQL ← Elasticsearch/Meilisearch بلا كود تطبيقي
- تحديث cache: PostgreSQL ← Redis، invalidation آلية
- Lake data: PostgreSQL ← Iceberg/Delta على S3 للتحليلات
- تكامل microservices: نقل تدريجي من monolithe بلا انقطاع
- Audit log: تاريخ كامل للتغييرات على بنود حساسة