تطوير الويب

Redis Streams: event log وconsumer groups خطوة بخطوة

4 min de lecture

📌 المقال الرئيسي: Redis 8: caching، queues، pub/sub، streams

Streams Redis، المُقَدَّمة في 5.0، تُحَوِّل Redis إلى broker أحداث دائم — بديل بسيط وعالي الأداء لـ Apache Kafka للأحجام دون التيرابايت. خلافًا لـ pub/sub، كلّ حدث يُخَزَّن دائمًا، مُفَهرَس بمُعَرِّف زمني فريد، ويمكن استهلاكه من عدّة consumer groups مستقلّة مع ضمان التسليم. يبني هذا الدليل pipeline event-driven بـ consumer groups، إدارة pending messages، واستراتيجية retry.

المتطلّبات

  • Redis 8 يعمل.
  • Node.js 22 LTS أو أيّ عميل (Python، Go، Java).
  • قراءة مفاهيم pub/sub للمقارنة.
  • الوقت: 80 دقيقة.

الخطوة 1 — إنشاء stream وإضافة أحداث

redis-cli --user appuser -a "$REDIS_PASSWORD"

# إضافة أحداث إلى stream 'commandes'
> XADD commandes * id 1001 userId salim total 50000 produit "laptop"
"1715985432000-0"
> XADD commandes * id 1002 userId mariam total 8500 produit "souris"
"1715985441000-0"
> XADD commandes * id 1003 userId ahmed total 120000 produit "ecran"
"1715985456000-0"

# قراءة كلّ الأحداث منذ البداية
> XRANGE commandes - +

# طول stream
> XLEN commandes
(integer) 3

النجمة * تطلب من Redis توليد ID تلقائيًّا بصيغة milliseconds-sequence. هذا الـ ID متزايد بصرامة ممّا يضمن الترتيب الزمني. payload قاموس اعتباطي (حتى 512 ميغا تراكميًّا لكلّ حدث).

الخطوة 2 — قراءة الأحداث في وضع pull (XREAD)

// consumer-simple.js
import Redis from 'ioredis';
const redis = new Redis({ username:'appuser', password:process.env.REDIS_PASSWORD });

let dernierId = '$'; // '$' = آخر حدث موجود عند الانطلاق

while (true) {
  const result = await redis.xread('BLOCK', 5000, 'STREAMS', 'commandes', dernierId);
  if (!result) {
    console.log('Aucun événement en 5s, on continue...');
    continue;
  }
  const [stream, events] = result[0];
  for (const [id, fields] of events) {
    const data = {};
    for (let i = 0; i < fields.length; i += 2) data[fields[i]] = fields[i+1];
    console.log('Evenement ' + id + ':', data);
    dernierId = id;
  }
}

هذا المُستهلِك يقرأ كلّ الأحداث الجديدة. الحدّ: إن انهار وأعيد تشغيله، يفوّت أحداثًا. الحلّ: consumer groups.

الخطوة 3 — إنشاء consumer group

# إنشاء consumer group باسم 'traitement' ينطلق من نهاية stream
> XGROUP CREATE commandes traitement $
OK

# إدراج consumer groups
> XINFO GROUPS commandes

الـ $ عند الإنشاء يعني "البدء من الأحداث التالية". لإعادة التشغيل من البداية، استعمل 0 بدله.

الخطوة 4 — Worker في consumer group

// worker-stream.js
import Redis from 'ioredis';
const redis = new Redis({ username:'appuser', password:process.env.REDIS_PASSWORD });

const STREAM   = 'commandes';
const GROUP    = 'traitement';
const CONSUMER = 'worker-' + process.pid;

while (true) {
  try {
    const result = await redis.xreadgroup(
      'GROUP', GROUP, CONSUMER,
      'COUNT', 10,
      'BLOCK', 5000,
      'STREAMS', STREAM, '>'
    );
    if (!result) continue;

    const [_streamName, events] = result[0];
    for (const [id, fields] of events) {
      try {
        const data = {};
        for (let i = 0; i < fields.length; i += 2) data[fields[i]] = fields[i+1];
        console.log('[' + CONSUMER + '] Traite ' + id + ':', data);

        // معالجة العمل
        await traiter(data);

        // إقرار
        await redis.xack(STREAM, GROUP, id);
      } catch (err) {
        console.error('Erreur sur ' + id + ':', err.message);
        // لا ACK: الحدث يبقى pending وسيُعاد محاولته
      }
    }
  } catch (err) {
    console.error('Erreur boucle:', err.message);
    await new Promise(r => setTimeout(r, 1000));
  }
}

async function traiter(donnees) {
  await new Promise(r => setTimeout(r, 200));
  if (Math.random() < 0.05) throw new Error('Erreur transitoire simulée');
}

الرمز > في XREADGROUP يعني "اقرأ فقط الأحداث التي لم تُسَلَّم لهذا consumer group". XACK في نهاية المعالجة يُعَلِّم الحدث كمُقَرّ؛ بدونه، يبقى في PEL (Pending Entries List) ويُعاد محاولته.

الخطوة 5 — تشغيل عدّة workers

# Terminal 1
node worker-stream.js
# Terminal 2
node worker-stream.js
# Terminal 3
node worker-stream.js
for i in {1..30}; do
  redis-cli --user appuser -a "$REDIS_PASSWORD" \
    XADD commandes '*' id "$i" userId "user$i" total "$((RANDOM % 100000))"
  sleep 0.2
done

الـ 30 حدثًا مُوَزَّعة بين الـ 3 workers، نحو 10 لكلّ. إن قتلت worker (Ctrl+C)، الأحداث في تنفيذه تبقى في PEL.

الخطوة 6 — استعادة الرسائل المحجوبة (XPENDING + XCLAIM)

// recovery.js — للتنفيذ دوريًّا
async function recovery() {
  // إدراج الأحداث pending منذ أكثر من 30 ثانية
  const pending = await redis.xpending(STREAM, GROUP, 'IDLE', 30000, '-', '+', 100);

  for (const [id, consumer, msIdle, deliveryCount] of pending) {
    if (deliveryCount > 5) {
      // محاولات كثيرة: إرسال إلى dead letter
      await redis.xadd('dead-letter', '*', 'originalId', id, 'consumer', consumer, 'deliveryCount', deliveryCount);
      await redis.xack(STREAM, GROUP, id);
      console.warn('DLQ : ' + id);
      continue;
    }

    // Reclaim: نقل إلى هذا worker
    const claimed = await redis.xclaim(STREAM, GROUP, CONSUMER, 30000, id);
    if (claimed.length > 0) {
      console.log('Réclamé ' + id + ' (idle ' + msIdle + 'ms, ' + deliveryCount + ' tentatives)');
    }
  }
}

setInterval(recovery, 10000);

XPENDING يُدرج الأحداث pending؛ XCLAIM ينقل الملكية إلى consumer آخر. القاعدة "إن deliveryCount > 5، أرسل إلى dead letter" تتفادى حلقات لا نهائية على حدث مسموم.

الخطوة 7 — تحديد حجم stream

// تحديد بنحو مليون حدث
await redis.xadd('commandes', 'MAXLEN', '~', 1000000, '*', 'id', '1001', 'userId', 'salim');

// أو حذف ما قبل timestamp معطى
const il_y_a_24h = Date.now() - 24 * 3600 * 1000;
await redis.xtrim('commandes', 'MINID', '~', il_y_a_24h);

المُشَغِّل ~ يعني "تقريبًا": Redis قد يحفظ أكثر قليلًا لتحسين الأداء. لحدّ صارم، استعمل =.

الخطوة 8 — المقارنة مع Apache Kafka

Kafka Redis Streams
Topic Stream (مفتاح Redis)
Partition Stream وحيد (لا partitioning أصلي داخل stream)
Offset ID Redis Stream (timestamp-séquence)
Consumer group Consumer group (مفهوم مطابق)
Commit offset XACK
Log compaction غير متاح (استعمل XTRIM بالعمر)
Réplication multi-broker نسخ Redis كلاسيكي
Schema Registry (Avro) لا — JSON خامّ

لأحجام دون التيرابايت/يوم، Redis Streams يُغَطّي 90% من الاحتياجات الحدثية ببساطة تشغيلية أعلى محسوسًا. ما فوق ذلك، Kafka يبقى الأداة الصحيحة.

الخطوة 9 — حالة استعمال: feed نشاط مستخدم

// Producer من API
app.post('/api/like', async (req, res) => {
  await redis.xadd('activity', '*',
    'type',     'like',
    'userId',   req.user.id,
    'targetId', req.body.postId,
    'ts',       Date.now()
  );
  res.json({ ok: true });
});

// Consumer A: feed نشاط لحظي
group: 'feed-builder' --> يُحَدِّث timeline كلّ متابع

// Consumer B: تحليلات
group: 'analytics' --> يُجَمِّع بالساعة ويُخَزِّن في PostgreSQL

// Consumer C: ML
group: 'ml-recommandations' --> يُغَذِّي نموذج توصيات

كلّ consumer group يستهلك مستقلًّا، بإيقاعه الخاصّ. هذا الأصل النموذجي لمعمارية event-driven.

أخطاء شائعة

الخطأ السبب الحلّ
أحداث لم تُستهلَك أبدًا Consumer group مُنشأ بعد XADD بـ $ أعد إنشاء الـ group بـ XGROUP CREATE stream group 0
PEL ينمو بلا حدود workers لا تُنَفِّذ XACK تحقّق من استدعاء XACK في فرع النجاح
زمن استجابة مرتفع BLOCK 0 لا نهائي استعمل BLOCK 5000 ولُف
Memory leak Redis Stream بلا حدّ، MAXLEN لم يُطَبَّق هَيِّئ job cron لـ XTRIM دوريًّا
أحداث مُعالَجة مرّتين worker ينهار بين traiter() وXACK اجعل traiter idempotent

الأدلّة التالية

🔝 العودة للدليل الرئيسي

FAQ

Streams أم BullMQ لمعالجة jobs؟ BullMQ لـ jobs بـ retry، priority، scheduling cron. Streams لأحداث عمل خامّ حيث عدّة مستهلكين مستقلّين يجب أن يُعالجوا نفس التدفّق (event sourcing، fan-out).

كيف نُجَزِّئ للتوسيع؟ Redis Streams لا يدعم partitioning داخل stream. للتوسيع: (1) sharder بمفتاح تطبيقي بإنشاء N streams، (2) توجيه كلّ حدث بحسب hash(userId)، (3) N consumer groups مستقلّة.

أيّ تنسيق للـ payload؟ حقول كلّ-قيمة مسطّحة إن كانت البنية بسيطة. JSON مُرَمَّز في حقل وحيد payload للتكتيب الغني. للتطوّر الحرج، JSON Schema أو Protobuf.

كم streams أقصى؟ لا حدّ صارم في Redis — مجرّد مفاتيح. بعدّة آلاف من streams على نفس Redis، الأداء ممتاز.

مراجع

  • Streams Redis — التوثيق الرسمي
  • أمر XADD
  • أمر XREADGROUP
  • أمر XCLAIM
Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité