📌 نظرة عامّة: MLOps حديث: من النموذج إلى الإنتاج
نظرة عامّة
ستُثَبِّت Kubeflow Pipelines standalone (KFP 2.15) على cluster Kubernetes، تكتب مكوّنَين Python (تحضير البيانات + تدريب)، تُسَلسلهما في pipeline بـ SDK kfp 2.16، تُجَمِّع إلى YAML، تُشَغِّل run من Python، تُراقب التنفيذ في الواجهة، ثم تُضيف parameters، artifacts، وجَدولة دورية. المدّة المتوقّعة: 60 إلى 90 دقيقة لأوّل run ناجح.
المقدّمة
تدريب نموذج في الإنتاج لا يختزل أبدًا في script train.py. الـ pipeline الحقيقي يُسَلسِل على الأقلّ أربع مراحل: استرداد البيانات وتنظيفها، التدريب، تقييم metrics، حفظ النموذج في registre. كلّ مرحلة لها تبعيّاتها، مواردها (CPU، GPU، ذاكرة)، مدّتها، ومعدّل إخفاقها. مُشَغَّلًا يدويًّا، يصير هذا التدفّق غير قابل للإدارة فور وجوب إعادته 3 مرّات يوميًّا.
السؤال الملموس لفريق data: كيف نُنَسِّق هذه المراحل بشكل قابل للإعادة وللتوسيع ومرئي؟ Kubeflow Pipelines يُجيب بتحويل كلّ مرحلة إلى container Kubernetes، بوصف التبعيّات في DAG Python، وبكشف runs وlogs وartifacts عبر UI ويب.
Kubeflow Pipelines مقابل Airflow مقابل Argo Workflows
الثلاثة يُنَسِّقون DAGs على Kubernetes لكن لا يستهدفون نفس الجمهور. Apache Airflow عامّ: ETL، jobs تحليلية، استيعاب. منظومته غنية لكنّ ML ليس مركز ثقله. Argo Workflows محرّك التنسيق العامّ لـ Kubernetes — وهو المحرّك تحت غطاء Kubeflow Pipelines. يُتعامَل معه بـ YAML خامّ، قوي لكن مُسهَب.
Kubeflow Pipelines يُضيف: SDK Python (kfp) يُوَلِّد manifests Argo من decorators، نظام artifacts مُكَتَّب (Dataset، Model، Metrics) متّصل بـ object store (MinIO أو S3)، وUI يُقارن runs بصريًّا. إن كان الهدف ML صرفًا وdata scientists لا يريدون كتابة YAML، KFP الخيار الصحيح.
المتطلّبات
- cluster Kubernetes (kubeadm، k3s، kind، EKS، GKE، AKS). النسخة 1.29 إلى 1.33 موصى بها لـ KFP 2.15.
- على الأقلّ 4 vCPU و8 جيغا RAM متاحة لـ pods KFP (API server، MySQL، MinIO، controller، UI).
- StorageClass افتراضي يُوَفِّر PVC ديناميكيًّا. تحقّق بـ
kubectl get sc. kubectl≥ 1.27 مُهَيَّأ.- Python 3.9+ وpip محدَّث.
- مجلّد عمل فارغ:
mkdir kfp-tuto && cd kfp-tuto.
إن لم يكن لديك cluster، k3s mononeud على VPS 4 جيغا يكفي. تجنّب Docker Desktop Kubernetes: StorageClass الافتراضي محدود وMinIO يرفض الإقلاع.
الخطوة 1 — تثبيت Kubeflow Pipelines standalone
وضع standalone ينشر KFP فقط، دون مكوّنات Kubeflow الأخرى (Notebooks، Katib، KServe). أقلّ pods، أقلّ RAM، تثبيت في 3 دقائق.
export PIPELINE_VERSION=2.15.0
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=$PIPELINE_VERSION"
الأمر kubectl wait ليس تجميليًّا: دونه، الـ manifest الثاني قد يفشل لأنّ CRD Application لم يكن جاهزًا. راقب بـ kubectl -n kubeflow get pods -w. يجب رؤية نحو 10 pods تمرّ إلى Running في 2 إلى 3 دقائق: ml-pipeline، ml-pipeline-ui، mysql، minio، workflow-controller، cache-server، metadata-grpc-deployment، metadata-envoy-deployment.
الخطوة 2 — الوصول إلى UI عبر port-forward
UI لـ KFP يكشفه service ml-pipeline-ui على المنفذ 80. في dev، نفتح نفقًا محلّيًّا.
kubectl -n kubeflow port-forward svc/ml-pipeline-ui 8080:80
افتح http://localhost:8080. تقع على صفحة Pipelines مع sidebar: Runs، Recurring Runs، Experiments، Artifacts. لا pipeline مُدرَجة بعد.
الخطوة 3 — إنشاء client Python kfp.Client()
python -m venv .venv
source .venv/bin/activate
pip install --upgrade pip
pip install "kfp>=2.16,<3.0"
النسخة 2.16.1 الأخيرة المستقرّة. لا تخلط SDK 1.x وserveur 2.x.
# client.py
import kfp
client = kfp.Client(host="http://localhost:8080")
print(client.list_experiments())
إن نجح الاتّصال، تحصل على ApiListExperimentsResponse بحقل experiments=None وtotal_size=0. خطأ Connection refused يعني أنّ port-forward مقطوع.
الخطوة 4 — كتابة مكوّنات بـ @dsl.component
مكوّن KFP دالّة Python مُزَيَّنة ستُغَلَّف في container وتُنَفَّذ كـ pod. الـ decorator @dsl.component يلتقط توقيع الدالّة ويُوَلِّد schema YAML.
# components.py
from kfp import dsl
from kfp.dsl import Output, Dataset, Model
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["scikit-learn==1.5.2", "pandas==2.2.3"],
)
def prepare_data(dataset: Output[Dataset]) -> None:
"""يولّد CSV اصطناعيًّا لتصنيف ثنائي."""
import pandas as pd
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=10, random_state=42)
df = pd.DataFrame(X, columns=[f"f{i}" for i in range(10)])
df["label"] = y
df.to_csv(dataset.path, index=False)
@dsl.component(
base_image="python:3.11-slim",
packages_to_install=["scikit-learn==1.5.2", "pandas==2.2.3", "joblib==1.4.2"],
)
def train_model(dataset: dsl.Input[Dataset], model: Output[Model]) -> float:
"""يُدرِّب regression لوجستية ويُرجع accuracy."""
import pandas as pd
import joblib
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
df = pd.read_csv(dataset.path)
X = df.drop(columns=["label"])
y = df["label"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
clf = LogisticRegression(max_iter=1000).fit(X_train, y_train)
acc = accuracy_score(y_test, clf.predict(X_test))
joblib.dump(clf, model.path)
return float(acc)
base_image يُعلن الصورة الأساس. Output[Dataset] وOutput[Model] artifacts: KFP يحقن مسار ملفّ تلقائيًّا يُرفَع إلى MinIO. القيمة المُرجَعة float تصير parameter مرئي في UI وقابل لإعادة الاستعمال.
الخطوة 5 — تركيب pipeline بـ @dsl.pipeline
@dsl.pipeline(
name="train-classifier",
description="يُحَضِّر dataset اصطناعيًّا ويُدَرِّب regression لوجستية.",
)
def training_pipeline() -> float:
prep_task = prepare_data()
train_task = train_model(dataset=prep_task.outputs["dataset"])
return train_task.output
لا تُمَرِّر dataset صراحة: KFP يستنتج DAG بقراءة المراجع. القيمة المُرجَعة train_task.output هي accuracy المُرجَعة من train_model. تظهر كـ pipeline output في UI.
الخطوة 6 — التجميع إلى YAML
# compile.py
from kfp import compiler
from components import training_pipeline
compiler.Compiler().compile(
pipeline_func=training_pipeline,
package_path="training_pipeline.yaml",
)
print("Pipeline compilée dans training_pipeline.yaml")
التجميع محلّي بالكامل. الملفّ training_pipeline.yaml يظهر: Pipeline IR بصيغة Argo Workflow، قابل للقراءة لكن مُسهَب (300 إلى 500 سطر).
الخطوة 7 — تقديم pipeline
# run.py
import kfp
from components import training_pipeline
client = kfp.Client(host="http://localhost:8080")
run = client.create_run_from_pipeline_func(
pipeline_func=training_pipeline,
arguments={},
experiment_name="demo-classifier",
run_name="first-run",
)
print(f"Run lancé : {run.run_id}")
print(f"URL UI : http://localhost:8080/#/runs/details/{run.run_id}")
يَطبَع run_id بصيغة UUID وURL مباشر. افتح URL: ترى DAG بعقدتَين (prepare-data، train-model) تمرّ تتابعًا من رمادي إلى أزرق إلى أخضر. أوّل run يأخذ 3 إلى 5 دقائق لـ pull python:3.11-slim. accuracy المتوقّعة حوالي 0.86.
الخطوة 8 — Parameters وartifacts
pipeline ثابت لا قيمة كبيرة له: نريد تغيير حجم dataset، عتبة regularisation، الخوارزمية. KFP يُتيح إعلان parameters على مستوى pipeline ونشرها إلى المكوّنات.
@dsl.component(...)
def prepare_data(n_samples: int, dataset: Output[Dataset]) -> None:
import pandas as pd
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=n_samples, n_features=10, random_state=42)
df = pd.DataFrame(X, columns=[f"f{i}" for i in range(10)])
df["label"] = y
df.to_csv(dataset.path, index=False)
@dsl.pipeline(name="train-classifier")
def training_pipeline(n_samples: int = 1000) -> float:
prep_task = prepare_data(n_samples=n_samples)
train_task = train_model(dataset=prep_task.outputs["dataset"])
return train_task.output
client.create_run_from_pipeline_func(
pipeline_func=training_pipeline,
arguments={"n_samples": 5000},
experiment_name="demo-classifier",
run_name="run-5k",
)
الـ artifacts (dataset.csv، model.joblib) تُرفَع تلقائيًّا إلى MinIO (bucket mlpipeline). انقر عقدة في UI ثم تبويب Artifacts: رابط Download يسترد الملفّ.
الخطوة 9 — جَدولة pipeline دورية
كثير من pipelines يجب أن تدور حلقيًّا: استيعاب ليلي، إعادة تدريب أسبوعية. KFP يكشف Recurring Run: cron مُطَبَّق على pipeline مُرفوعة.
pipeline = client.upload_pipeline(
pipeline_package_path="training_pipeline.yaml",
pipeline_name="train-classifier",
)
print(pipeline.pipeline_id)
experiment = client.create_experiment(name="nightly-retrain")
version = client.list_pipeline_versions(pipeline.pipeline_id).pipeline_versions[0]
client.create_recurring_run(
experiment_id=experiment.experiment_id,
job_name="nightly-train",
pipeline_id=pipeline.pipeline_id,
version_id=version.pipeline_version_id,
cron_expression="0 0 2 * * *", # 6 حقول: ثانية دقيقة ساعة يوم شهر يوم-أسبوع
params={"n_samples": 10000},
)
انتبه: KFP يستعمل cron بـ 6 حقول (تشمل الثواني)، لا cron Unix الكلاسيكي بـ 5 حقول. 0 0 2 * * * يعني كلّ يوم على 02:00:00.
الخطوة 10 — Logs وتنقيح run مُخفِق
في UI، انقر العقدة الحمراء، ثم تبويب Logs: ترى stdout/stderr للـ container. الردّ الأوّل. إن قُتل الـ pod (OOMKilled)، انتقل إلى CLI:
kubectl -n kubeflow get pods --sort-by=.metadata.creationTimestamp | tail -20
kubectl -n kubeflow describe pod <nom-du-pod>
kubectl -n kubeflow logs <nom-du-pod> --all-containers --previous
قسم Events في describe pod يكشف أخطاء Kubernetes (FailedScheduling، OOMKilled، ImagePullBackOff). للـ API server KFP:
kubectl -n kubeflow logs deploy/ml-pipeline -f
أخطاء شائعة
| العَرَض | السبب | التصحيح |
|---|---|---|
| pod minio في Pending | لا StorageClass افتراضي | ثبّت local-path-provisioner أو علِّم SC قائمًا افتراضيًّا |
| Connection refused في kfp.Client | port-forward مقطوع | أعد kubectl -n kubeflow port-forward svc/ml-pipeline-ui 8080:80 |
| TypeError invalid input type | annotation Python غير مدعوم | استعمل list[str] أو أنواع KFP Input/Output[Dataset/Model/Artifact] |
| Run محجوب في Pending | CPU/RAM غير كافية | kubectl describe pod واقرأ Events |
| مكوّن OOMKilled | dataset أكبر من RAM الافتراضي | أضف .set_memory_limit("4Gi") على PipelineTask |
| Recurring run لم يُشَغَّل | cron بـ 5 حقول بدل 6 | تحقّق من 6 حقول: 0 0 2 * * * |
| pipeline name already exists | اسم pipeline موجود | استعمل upload_pipeline_version أو احذف القديمة |
مصادر رسمية
- توثيق Kubeflow Pipelines
- دليل التثبيت standalone
- مرجع SDK Python
kfp - Releases GitHub
kubeflow/pipelines - مصفوفة توافق النسخ
الأدلّة المرتبطة
- MLOps حديث
- MLflow Model Registry وCI/CD
- تقديم نموذج بـ BentoML
- كشف drift بـ Evidently
- Feast: feature store
FAQ
أيمكن استعمال KFP دون Kubeflow كامل؟ نعم، هذا وضع standalone في هذا الدليل. تُثَبِّت فقط مكوّنات Pipelines.
الفرق بين create_run_from_pipeline_func وcreate_run_from_pipeline_package؟ الأولى تُجَمِّع pipeline على الطاير، عملية في dev. الثانية تأخذ YAML مُجَمَّعًا، عملية في CI/CD.
كيف نُمَرِّر GPU إلى مكوّن؟ على PipelineTask: .set_accelerator_type("nvidia.com/gpu").set_accelerator_limit(1). الـ cluster يحتاج NVIDIA device plugin مُثَبَّت.
artifacts تختفي بعد أيّام، طبيعي؟ MinIO المُضَمَّن في KFP standalone يستعمل PVC وحيد. في الإنتاج، اربط KFP على object store خارجي.
كيف نُؤَرِّخ pipeline؟ بعد أوّل upload_pipeline، استعمل client.upload_pipeline_version_from_pipeline_package_path بنفس pipeline_id.
KFP يُدير pipelines موزَّعة (تدريب multi-GPU)؟ KFP يُنَسِّق مكوّنات، كلّ مكوّن pod. للتدريب الموزَّع على عدّة عقد، استعمل مكوّنًا يُشَغِّل PyTorchJob أو TFJob عبر مُشَغِّل Training Kubeflow.