تطوير الويب

Aggregation Pipeline في MongoDB: $match و$lookup و$facet و$merge عمليًّا

3 min de lecture

الـ aggregation pipeline هو مُؤلِّف MongoDB: سلسلة عمليات تصريحية تُحوِّل مجموعة (collection) إلى جواب تجاري في استعلام واحد. ستتعرّف هنا على ثنائي SQL: SELECT … GROUP BY … HAVING، لكن أكثر تعبيرًا بكثير: تستطيع الضمّ، وفرد المصفوفات، وتنفيذ فروع متعدّدة بالتوازي، وتثبيت النتيجة في مجموعة أخرى دون مغادرة المحرّك. يشرح هذا الدليل القواعد المفيدة في الإنتاج على MongoDB 8.0 LTS ويُطبّق match stage pull-up و$lookup/$unwind coalescing، وهما تحسينان مدمجان في الإصدار 8.0.

المتطلّبات

  • MongoDB 8.0 LTS (Community أو Atlas M0).
  • mongosh 2.x أو MongoDB Compass مثبَّت.
  • مخطّط أدنى: مجموعتان commandes وproduits، عدّة مئات من المستندات لرصد سلوك كلّ مرحلة.
  • قراءة دليل النمذجة embedded vs referenced — الـ pipeline يستفيد من مخطّط مُحكَم.
  • الوقت المتوقّع: 100 دقيقة لاستكمال الخطوات التسع مع الأمثلة.

الخطوة 1 — تشريح pipeline

الـ pipeline مصفوفة من المراحل (stages) تُنفَّذ بالترتيب. كلّ مرحلة تستقبل تدفّق مستندات، تُحوّله، وتُمرّر النتيجة إلى المرحلة التالية. شكل خرج مرحلة يصبح شكل دخل التالية — كأنبوب Unix لكن للمستندات BSON.

db.commandes.aggregate([
  { $match: { statut: "payee" } },
  { $project: { client_id: 1, montant_usd: 1, mois: { $month: "$cree_le" } } },
  { $group: { _id: "$mois", ca_mensuel: { $sum: "$montant_usd" } } },
  { $sort: { _id: 1 } },
  { $limit: 12 }
]);

السلسلة أعلاه تُصفّي، تُسقط الحقول المفيدة، تُجَمِّع بالشهر، ترتّب، تُحدِّد — كما تقرأ عبارة « آخر اثني عشر رقم أعمال شهري للطلبات المدفوعة ». يُطبّق المحرّك المراحل بالترتيب المعطى لكنّه يُعيد ترتيب ما يمكن ترتيبه بذكاء: يُطبّق محرّك التجميع تلقائيًا match stage pull-up الذي يرفع $match الموضوع بعد $project ليُصفّي مبكّرًا ويقرأ مستندات أقلّ. المرجع: MongoDB 8.0 Release Notes — Query Planner.

الخطوة 2 — $match أوّلًا: الفلتر المربح

وضع $match في البداية هو القاعدة الذهبية الأولى. يقرأ الـ pipeline المجموعة عبر الفهارس المناسبة فقط حين يكون الفلتر في وضع مبكّر بما يكفي؛ بعد المرحلة الأولى، يكون المحرّك قد بدأ توطين المستندات في الذاكرة ويصير الفلترة عمل فرز بعد القراءة، لا فحص فهرس موجَّه.

// بطيء — المحرّك يقرأ كامل المجموعة قبل الفلترة
db.commandes.aggregate([
  { $addFields: { mois: { $month: "$cree_le" } } },
  { $match: { statut: "payee", cree_le: { $gte: ISODate("2026-01-01") } } }
]);

// سريع — match في المقدّمة، يُستعمَل الفهرس (statut, cree_le)
db.commandes.aggregate([
  { $match: { statut: "payee", cree_le: { $gte: ISODate("2026-01-01") } } },
  { $addFields: { mois: { $month: "$cree_le" } } }
]);

التشغيل مع explain("executionStats") على النسختين يُظهر، في الأولى، COLLSCAN على كامل الطلبات؛ في الثانية، IXSCAN على الفهرس المركّب (statut, cree_le) مع nReturned قريب من totalKeysExamined. الفارق يُحسَب بمعامل 50 على مجموعة ذات ملايين المستندات.

الخطوة 3 — $group والـ accumulators

الـ $group يُجمّع المستندات حسب مفتاح ويُطبّق accumulators على الحقول الأخرى. هو المكافئ الوظيفي لـ GROUP BY، وأكثر ثراءً: تستطيع جمع المبالغ، حساب المتوسّطات، الحدود الدنيا/العليا، لكن أيضًا الاحتفاظ بأوّل مستند من كلّ مجموعة بـ $first، تكديس القيم في مصفوفة بـ $push، وإزالة التكرار بـ $addToSet.

db.commandes.aggregate([
  { $match: { statut: "payee" } },
  { $group: {
      _id: "$client_id",
      ca_total: { $sum: "$montant_usd" },
      panier_moyen: { $avg: "$montant_usd" },
      premiere: { $min: "$cree_le" },
      derniere: { $max: "$cree_le" },
      nb_commandes: { $sum: 1 },
      produits_vus: { $addToSet: "$produit_id" }
    }
  }
]);

مستند الخرج لكلّ عميل يحوي سبعة حقول مُحسَبة مسبقًا في قراءة واحدة. لاحظ استخدام { $sum: 1 } لعدّ عناصر المجموعة: هذا هو الاصطلاح المعتمد في MongoDB، مكافئ COUNT(*) في SQL. المُشغّلان $percentile و$median المُقدَّمان في MongoDB 7.0 متاحان هنا أيضًا لحساب زمن الاستجابة أو التوزيعات دون فرز المجموعة بأكملها. المرجع: $group reference.

الخطوة 4 — $lookup لضمّ المجموعات

الـ $lookup هو مكافئ LEFT OUTER JOIN في SQL. يُغني كلّ مستند من التدفّق بمصفوفة مستندات متطابقة من مجموعة أخرى. وسّع MongoDB 8.0 قدراته: يعمل $lookup الآن على مجموعات sharded داخل معاملة، وهو ما لم يكن مسموحًا قبلًا.

db.commandes.aggregate([
  { $match: { statut: "payee" } },
  { $lookup: {
      from: "clients",
      localField: "client_id",
      foreignField: "_id",
      as: "client"
    }
  },
  { $unwind: "$client" },
  { $project: {
      _id: 0,
      commande_id: "$_id",
      montant: "$montant_usd",
      client_nom: "$client.nom",
      client_email: "$client.email"
    }
  }
]);

نتيجة $lookup هي دائمًا مصفوفة، ولهذا يلي $unwind لتسطيحها حين تعلم أنّ التطابق واحد على الأكثر. قاعدة الأداء: يجب أن تكون المجموعة المرجَّع إليها مُفَهرسة على foreignField. بدون هذا الفهرس، كلّ مستند من التدفّق الرئيسي يُشعل COLLSCAN على المجموعة المرتبطة — يصبح التجميع تربيعيًّا وينهار من بضعة آلاف من الطلبات.

الخطوة 5 — $unwind لفرد مصفوفة

يأخذ $unwind حقلًا من نوع مصفوفة ويُنتج مستندًا لكلّ عنصر. هذه هي المرحلة التي تُحوّل مستند « طلب بثلاثة أسطر » إلى ثلاثة مستندات « سطر واحد » — مفيدة حين يهتمّ التجميع بالعناصر فُرادى لا بالمستند الأب.

// المخطّط: الطلب يضمّ أسطره
// { _id, client_id, lignes: [{ produit_id, qte, prix }, ...] }

db.commandes.aggregate([
  { $match: { statut: "payee" } },
  { $unwind: "$lignes" },
  { $group: {
      _id: "$lignes.produit_id",
      qte_vendue: { $sum: "$lignes.qte" },
      ca: { $sum: { $multiply: ["$lignes.qte", "$lignes.prix"] } }
    }
  },
  { $sort: { ca: -1 } },
  { $limit: 10 }
]);

السلسلة $unwind → $group تُنتج قائمة أعلى 10 منتجات حقّقت أعلى رقم أعمال. يُطبّق محرّك التجميع تلقائيًا التحسين الرسمي $lookup, $unwind, and $match Coalescence: حين يلي $unwind فورًا $lookup على المفتاح ذاته، يدمج المحرّك المرحلتين، فيُلغي المصفوفة الوسيطة ويُحرِّر الذاكرة. المرجع: Aggregation Pipeline Optimization.

الخطوة 6 — $facet لأنابيب متوازية

يُتيح $facet تنفيذ عدّة أنابيب فرعية بالتوازي على نفس تدفّق الدخل. هذا ما يُحوّل صفحة كتالوج من نوع Amazon إلى استعلام واحد: قائمة منتجات مُرَقَّمة + عدّادات لكلّ فئة + شرائح أسعار + العدد الإجمالي، كلّه في نداء واحد.

db.produits.aggregate([
  { $match: { actif: true } },
  { $facet: {
      page_courante: [
        { $sort: { vues: -1 } },
        { $skip: 0 },
        { $limit: 24 },
        { $project: { nom: 1, prix_usd: 1, image: 1 } }
      ],
      facettes_categorie: [
        { $group: { _id: "$categorie", n: { $sum: 1 } } },
        { $sort: { n: -1 } }
      ],
      tranches_prix: [
        { $bucket: {
            groupBy: "$prix_usd",
            boundaries: [0, 10, 50, 200, 1000, 99999],
            default: "autre",
            output: { n: { $sum: 1 } }
          }}
      ],
      total: [ { $count: "nombre_produits" } ]
    }
  }
]);

مستند الخرج يحوي أربعة مفاتيح تُقابل الأنابيب الفرعية. على جانب العميل، تقرأ في ردّ واحد: المنتجات المعروضة، الـ facettes للشريط الجانبي، مدرّج الأسعار، والمجموع. بدون $facet، لكنّا احتجنا أربعة نداءات. المرجع: $facet reference.

الخطوة 7 — $bucket و$bucketAuto للمدرّجات

مرحلتان متخصّصتان تُنتجان مدرّجات تكرارية. الأولى، $bucket، تتطلّب توفيرك الحدود يدويًّا؛ الثانية، $bucketAuto، تختار بنفسها حدودًا متوازنة لتوزيع المستندات في N سلال متقاربة الحجم.

// مدرّج يدوي: شرائح أسعار تجارية
db.produits.aggregate([
  { $bucket: {
      groupBy: "$prix_usd",
      boundaries: [0, 5, 25, 100, 500, 5000],
      default: "hors_tranches",
      output: {
        nombre: { $sum: 1 },
        exemples: { $push: { nom: "$nom", prix: "$prix_usd" } }
      }
    }
  }
]);

// مدرّج تلقائي: 5 سلال متساوية التوزيع
db.produits.aggregate([
  { $bucketAuto: {
      groupBy: "$prix_usd",
      buckets: 5,
      output: { nombre: { $sum: 1 } }
    }
  }
]);

يرفض $bucket المستندات التي تقع قيمتها خارج الحدود، إلّا إذا حدّدت default الذي يضمّها إلى سلّة « خارج الشرائح ». $bucketAuto لا يرفض شيئًا ويُنتج حدودًا يحسبها بنفسه — عملي لاستكشاف توزيع دون معرفة طرفيه. تتطلّب المرحلتان أن يكون حقل groupBy قابلًا للمقارنة (عدد، تاريخ، سلسلة).

الخطوة 8 — $merge و$out للتثبيت

يمكن للـ pipeline كتابة نتيجته في مجموعة. مرحلتان تُتيحان ذلك: $out يستبدل المجموعة الهدف كاملةً مع كلّ تنفيذ، $merge يُطبّق upsert مستندًا بمستند ويحفظ السجلّات التي ليست في التدفّق. $merge هو الخيار الموصى به في الإنتاج لأنّه يدعم التنفيذ التزايدي — نمطيًّا، تحديث لوحة تحكّم كلّ ليلة دون إعادة قراءة التاريخ.

// تجسيد لوحة المبيعات اليومية
db.commandes.aggregate([
  { $match: { statut: "payee" } },
  { $group: {
      _id: {
        annee: { $year: "$cree_le" },
        mois: { $month: "$cree_le" },
        jour: { $dayOfMonth: "$cree_le" }
      },
      ca_usd: { $sum: "$montant_usd" },
      commandes: { $sum: 1 }
    }
  },
  { $merge: {
      into: "dashboard_ventes_jour",
      on: "_id",
      whenMatched: "replace",
      whenNotMatched: "insert"
    }
  }
]);

اللوحة الآن مجموعة مُجَسَّدة جاهزة للاستعلام من واجهة dashboard. قاعدة الحداثة بين يديك: شَغِّل الـ pipeline في cron، أو في trigger، أو على الطلب وفق الكلفة المقبولة. مجموعة dashboard_ventes_jour تُجيب في بضعة أجزاء من الثانية حيث كان الحساب المباشر يستغرق ثوانٍ. المرجع: $merge reference.

الخطوة 9 — explain() والتحسين

كما هو الحال مع الاستعلامات البسيطة، يُفحَص التجميع بـ explain(). اطلب وضع executionStats لاسترداد تفاصيل كلّ مرحلة: عدد المستندات المقروءة، المدّة، الفهارس المستعملة، الذاكرة المخصَّصة.

db.commandes.explain("executionStats").aggregate([
  { $match: { statut: "payee" } },
  { $lookup: { from: "clients", localField: "client_id", foreignField: "_id", as: "c" } },
  { $unwind: "$c" },
  { $group: { _id: "$c.ville", ca: { $sum: "$montant_usd" } } }
]);

أربع إشارات يجب مراقبتها في الخرج. واحدة: winningPlan.stage يجب أن يكون IXSCAN، لا COLLSCAN أبدًا. اثنتان: totalKeysExamined يجب أن يقترب من nReturned — وإلّا فالفهرس لا يُميِّز كفاية. ثلاث: executionTimeMillis يُعطي المدّة الكلّية للمقارنة بميزانية الـ API. أربع: حضور SBE (Slot-Based Execution) أو block processing للسلاسل الزمنية على MongoDB 8.0 — الانتقال إلى SBE يُسرِّع التجميعات نمطيًّا بـ 30 إلى 50%.

الأخطاء الشائعة

الخطأ السبب الحلّ
$match بعد $lookup فلتر متأخّر، قراءة عقيمة لكامل التدفّق ارفع $match إلى الأوّل، يتولّى الفهرس على المجموعة الرئيسية
$lookup بطيء على مجموعة ضخمة غياب فهرس على foreignField db.cible.createIndex({ champ_jointure: 1 })
QueryExceededMemoryLimitNoDiskUseAllowed مرحلة في الذاكرة تتجاوز 100 ميغا أضف { allowDiskUse: true } إلى خيار التجميع
نتيجة $facet مبتورة أنبوب فرعي يتجاوز 16 ميغا (حدّ مستند الخرج) أضف $limit في كلّ أنبوب فرعي أو جَسِّد عبر $merge
$unwind على مصفوفة فارغة يُسقط المستندات السلوك الافتراضي الخيار { path: "$champ", preserveNullAndEmptyArrays: true }

الأسئلة الشائعة

س: ما الفرق بين $lookup وJOIN في SQL؟
ج: $lookup يُرجع مصفوفة تطابقات (صفر، واحد، أو عدّة مستندات)، لا سطرًا مُسطَّحًا. أقرب إلى left outer join مُجَمَّع. للتسطيح، اتبعه بـ $unwind.

س: هل يستطيع الـ pipeline تعديل المجموعة المصدر؟
ج: لا، أبدًا. التجميع يقرأ المصدر ويكتب في مجموعة أخرى عبر $out أو $merge. إن أردت التعديل في مكانه فاستعمل db.collection.updateMany()، لا aggregation pipeline.

س: حدّ حجم مستند الخرج؟
ج: 16 ميغا، كأي مستند MongoDB. إن أنتج الـ pipeline أكثر من 16 ميغا في نتيجة واحدة، استعمل $out أو $merge للتثبيت ثم اقرأ على دفعات.

س: هل يمكن تنفيذ pipeline من مُشغّل تطبيقي؟
ج: نعم. الدالّة collection.aggregate(pipeline, options) موجودة في كلّ المُشغّلات الرسمية (Node.js، Python، Java، Go، Rust). تُسلسَل مصفوفة الـ pipeline إلى BSON وتُرسَل كما هي للخادم.

س: هل $facet أسرع من عدّة استعلامات؟
ج: نعم، لأنّ المجموعة تُقرأ مرّة واحدة فقط. N أنابيب فرعية تتشارك فحص الفهرس المبدئي. زمن استجابة نداء $facet واحد نمطيًّا أدنى بـ 30 إلى 60% من مجموع N نداءات منفصلة على الفلتر ذاته.

للتعمّق

Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité