Business Digital

Kubeflow Pipelines : orchestrer un workflow ML sur Kubernetes

17 دقائق للقراءة
Vue d’ensemble
Vous allez installer Kubeflow Pipelines standalone (KFP 2.15) sur un cluster Kubernetes, ecrire deux composants Python (preparation des donnees + entrainement), les chainer dans une pipeline avec le SDK kfp 2.16, compiler en YAML, lancer le run depuis Python, observer l’execution dans l’UI, puis ajouter parametres, artifacts et planification recurrente. Duree estimee : 60 a 90 minutes pour le premier run reussi.

Introduction

Entrainer un modele en production ne se resume jamais a un script train.py. Le pipeline reel enchaine au minimum quatre etapes : recuperation et nettoyage des donnees, entrainement, evaluation des metriques, sauvegarde du modele dans un registre. Chaque etape a ses dependances, ses ressources (CPU, GPU, memoire), ses durees, et son taux d’echec. Lance a la main, ce flux devient ingerable des qu’on doit le rejouer trois fois par jour sur des donnees fraiches, le distribuer sur plusieurs noeuds, ou tracer quel jeu de donnees a produit quel modele.

La question concrete pour une equipe data dans une organisation tech qui passe du notebook au cluster partage : comment orchestrer ces etapes de maniere reproductible, scalable, et visible par toute l’equipe ? Kubeflow Pipelines repond a ce besoin en transformant chaque etape en conteneur Kubernetes, en decrivant les dependances dans un DAG Python, et en exposant runs, logs et artifacts via une UI web. Ce tutoriel vous fait deployer KFP 2.15 sur un cluster existant et executer une pipeline complete de bout en bout.

Kubeflow Pipelines vs Airflow vs Argo Workflows

Les trois outils orchestrent des DAG sur Kubernetes mais ne ciblent pas le meme public. Apache Airflow est generaliste : ETL, jobs analytiques, ingestion. Son ecosysteme d’operateurs est riche mais le ML n’est pas son centre de gravite ; pas de notion native d’artifact typee, pas d’UI pensee pour comparer des runs d’entrainement.

Argo Workflows est le moteur d’orchestration generique de Kubernetes. C’est d’ailleurs le moteur sous le capot de Kubeflow Pipelines. On le manipule en YAML brut, ce qui le rend puissant mais verbeux pour un data scientist qui veut rester en Python.

Kubeflow Pipelines ajoute trois choses qui changent la vie d’une equipe ML : un SDK Python (kfp) qui genere les manifests Argo a partir de decorateurs, un systeme d’artifacts type (Dataset, Model, Metrics) qui se branche sur un object store (MinIO ou S3), et une UI qui compare visuellement les runs entre eux. Si votre cible est strictement ML et que vos data scientists ne veulent pas ecrire de YAML, KFP est le bon choix.

Prerequis

  • Un cluster Kubernetes accessible (kubeadm, k3s, kind, EKS, GKE, AKS). Version 1.29 a 1.33 recommandee pour KFP 2.15 ; les versions 1.27-1.28 fonctionnent encore mais ne sont plus la cible.
  • Au moins 4 vCPU et 8 Go de RAM disponibles sur le cluster pour les pods KFP (API server, MySQL, MinIO, controller, UI).
  • Une StorageClass par defaut qui provisionne dynamiquement des PVC. Verifiez avec kubectl get sc : une ligne doit etre marquee (default).
  • kubectl ≥ 1.27 configure pour viser le bon cluster (kubectl get nodes doit lister vos noeuds).
  • Python 3.9 ou plus recent et pip a jour sur votre poste local.
  • Un dossier de travail vide : mkdir kfp-tuto && cd kfp-tuto.

Si vous n’avez pas encore de cluster, un k3s mono-noeud sur un VPS 4 Go suffit pour ce tutoriel. Evitez Docker Desktop Kubernetes : la StorageClass par defaut y est limitee et MinIO refusera de demarrer.

Etape 1 — Installer Kubeflow Pipelines standalone

Le mode standalone deploie uniquement KFP, sans les autres composants Kubeflow (Notebooks, Katib, KServe). C’est ce que vous voulez pour commencer : moins de pods, moins de RAM consommee, installation en 3 minutes. Les manifests officiels sont publies dans le depot kubeflow/pipelines sous forme de kustomize, taggues par version. Au moment ou j’ecris, la derniere version stable est 2.15.0.

Definissez la variable de version puis appliquez les deux manifests dans l’ordre. Le premier installe les CRD et les RBAC cluster-scoped, le second installe les composants KFP dans le namespace kubeflow :

export PIPELINE_VERSION=2.15.0

kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"

kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io

kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=$PIPELINE_VERSION"

La commande kubectl wait entre les deux apply n’est pas cosmetique : sans elle, le second manifest peut echouer parce que le CRD Application n’est pas encore disponible cote API server. Surveillez le deploiement avec kubectl -n kubeflow get pods -w. Vous devez voir une dizaine de pods passer en Running en 2 a 3 minutes : ml-pipeline, ml-pipeline-ui, mysql, minio, workflow-controller, cache-server, metadata-grpc-deployment, metadata-envoy-deployment.

Signal de reussite : kubectl -n kubeflow get pods retourne tous les pods en Running avec 1/1 ou 2/2 READY. Si un pod reste en Pending, lancez kubectl -n kubeflow describe pod <nom> : 9 fois sur 10 le probleme vient d’une StorageClass manquante ou d’un noeud sans ressources.

Etape 2 — Acceder a l’UI via port-forward

L’UI de KFP est exposee par le service ml-pipeline-ui sur le port 80 dans le cluster. En dev, on ouvre un tunnel local plutot que de configurer un Ingress :

kubectl -n kubeflow port-forward svc/ml-pipeline-ui 8080:80

Laissez la commande tourner dans un terminal dedie. Ouvrez http://localhost:8080 dans votre navigateur. Vous tombez sur la page Pipelines avec, dans la sidebar, les sections Runs, Recurring Runs, Experiments et Artifacts. Aucun pipeline n’est encore liste : c’est normal, on va en uploader un dans les etapes suivantes.

Signal de reussite : la page d’accueil affiche le menu lateral et le tableau central est vide avec le message No pipelines found. Si la page reste blanche, verifiez que le tunnel est toujours actif (Handling connection dans le terminal du port-forward) et que votre navigateur ne bloque pas localhost.

Etape 3 — Creer un client Python kfp.Client()

Le SDK kfp est le pont entre votre poste local et l’API de KFP. Installez-le dans un virtualenv pour eviter les conflits avec d’autres projets Python :

python -m venv .venv
source .venv/bin/activate  # sous Windows : .venv\Scripts\activate
pip install --upgrade pip
pip install "kfp>=2.16,<3.0"

La version 2.16.1 est la derniere stable au moment de la redaction. Verifiez avec pip show kfp ; le champ Version doit afficher 2.16.1 ou superieur. Ne melangez jamais SDK 1.x et serveur 2.x : les pipelines compilees ne seront pas reconnues.

Creez un fichier client.py qui ouvre une connexion vers le port-forward :

import kfp

client = kfp.Client(host="http://localhost:8080")
print(client.list_experiments())

Lancez python client.py. Si la connexion fonctionne, vous obtenez un objet ApiListExperimentsResponse avec un champ experiments=None (aucune experience definie) et total_size=0. C’est le retour attendu pour un cluster KFP frais. Une erreur Connection refused signifie que le port-forward de l’etape 2 a ete coupe : relancez-le et reessayez.

Note pratique : si vous deployez plus tard derriere un Ingress avec authentification, passez le token dans le constructeur via existing_token=<jwt> ou utilisez kfp.Client(host="https://kfp.example.com", existing_token=os.environ["KFP_TOKEN"]).

Etape 4 — Ecrire des composants avec @dsl.component

Un composant KFP est une fonction Python decoree qui sera empaquetee dans un conteneur et executee comme un pod. Le decorateur @dsl.component capture la signature de la fonction (types des arguments et de la valeur de retour) et genere un schema YAML que le moteur Argo comprend.

Creez components.py avec deux composants : un qui prepare un jeu de donnees synthetique, un qui entraine un modele logistique scikit-learn dessus :

from kfp import dsl
from kfp.dsl import Output, Dataset, Model

@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["scikit-learn==1.5.2", "pandas==2.2.3"],
)
def prepare_data(dataset: Output[Dataset]) -> None:
    """Genere un CSV synthetique de classification binaire."""
    import pandas as pd
    from sklearn.datasets import make_classification

    X, y = make_classification(n_samples=1000, n_features=10, random_state=42)
    df = pd.DataFrame(X, columns=[f"f{i}" for i in range(10)])
    df["label"] = y
    df.to_csv(dataset.path, index=False)


@dsl.component(
    base_image="python:3.11-slim",
    packages_to_install=["scikit-learn==1.5.2", "pandas==2.2.3", "joblib==1.4.2"],
)
def train_model(dataset: dsl.Input[Dataset], model: Output[Model]) -> float:
    """Entraine une regression logistique et retourne l'accuracy."""
    import pandas as pd
    import joblib
    from sklearn.linear_model import LogisticRegression
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score

    df = pd.read_csv(dataset.path)
    X = df.drop(columns=["label"])
    y = df["label"]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    clf = LogisticRegression(max_iter=1000).fit(X_train, y_train)
    acc = accuracy_score(y_test, clf.predict(X_test))

    joblib.dump(clf, model.path)
    return float(acc)

Trois points a retenir dans ce code. D’abord, base_image declare l’image de base ; KFP construira un layer par-dessus pour installer packages_to_install avant d’executer la fonction. Ensuite, les types Output[Dataset] et Output[Model] sont des artifacts : KFP injecte automatiquement un chemin de fichier (dataset.path, model.path) qui sera uploade vers MinIO apres l’execution du composant. Enfin, la valeur de retour float devient un parametre visible dans l’UI et reutilisable par les composants en aval.

Le composant train_model recoit le dataset produit par prepare_data grace au type dsl.Input[Dataset]. La dependance entre les deux ne sera pas codee a la main : on la deduira en passant la sortie du premier en argument du second dans la pipeline.

Etape 5 — Composer une pipeline avec @dsl.pipeline

Une pipeline est une fonction decoree par @dsl.pipeline qui appelle les composants comme des fonctions normales. Le retour de chaque appel est un objet PipelineTask qui porte des attributs .output ou .outputs["nom"] selon le nombre de sorties. Ajoutez dans components.py (en bas du fichier) :

@dsl.pipeline(
    name="train-classifier",
    description="Prepare un dataset synthetique et entraine une regression logistique.",
)
def training_pipeline() -> float:
    prep_task = prepare_data()
    train_task = train_model(dataset=prep_task.outputs["dataset"])
    return train_task.output

Notez que vous ne passez pas explicitement dataset en sortie de prepare_data : KFP infere le DAG en lisant le graphe des references entre .outputs[...] et les arguments. C’est cette inference qui rend le SDK confortable comparativement a Argo YAML brut.

Le retour de la pipeline (train_task.output) est l’accuracy float retournee par train_model. Elle apparaitra comme pipeline output dans l’UI, ce qui permet de comparer rapidement plusieurs runs entre eux sans rouvrir chaque pod.

Etape 6 — Compiler en YAML

Avant de soumettre la pipeline au serveur, on la compile en un fichier YAML autonome. Cette compilation est entierement locale, elle ne necessite pas de connexion au cluster. C’est aussi a ce moment qu’on detecte les erreurs de typage ou de signature.

Creez compile.py :

from kfp import compiler
from components import training_pipeline

compiler.Compiler().compile(
    pipeline_func=training_pipeline,
    package_path="training_pipeline.yaml",
)
print("Pipeline compilee dans training_pipeline.yaml")

Lancez python compile.py. Le fichier training_pipeline.yaml apparait dans le dossier courant : c’est un Pipeline IR au format Argo Workflow, lisible mais verbeux (300 a 500 lignes pour ce tutoriel). Vous pouvez l’ouvrir pour inspection : la section components liste vos deux composants avec leurs inputDefinitions et outputDefinitions, et la section root.dag decrit les dependances inferees.

Signal de reussite : le script affiche le message Pipeline compilee dans training_pipeline.yaml sans trace d’erreur et le fichier YAML fait au moins 200 lignes. Une erreur typique a ce stade est TypeError: ... is not a valid input type : vous avez probablement annote un argument avec un type Python natif (comme list sans parametrisation) au lieu d’un type KFP supporte.

Etape 7 — Soumettre la pipeline

Le client Python expose plusieurs methodes pour lancer une pipeline. La plus directe en dev est create_run_from_pipeline_func : elle compile a la volee et lance le run en une seule commande, sans passer par le fichier YAML. Creez run.py :

import kfp
from components import training_pipeline

client = kfp.Client(host="http://localhost:8080")
run = client.create_run_from_pipeline_func(
    pipeline_func=training_pipeline,
    arguments={},
    experiment_name="demo-classifier",
    run_name="first-run",
)
print(f"Run lance : {run.run_id}")
print(f"URL UI : http://localhost:8080/#/runs/details/{run.run_id}")

Lancez python run.py. Le script imprime un run_id au format UUID et une URL directe vers le run. Ouvrez cette URL dans le navigateur : vous voyez le DAG avec deux noeuds (prepare-data, train-model) qui passent successivement de gris (Pending) a bleu (Running) puis vert (Succeeded). Le premier run prend 3 a 5 minutes parce que KFP doit pull l’image python:3.11-slim et installer scikit-learn dans chaque pod.

Signal de reussite : les deux noeuds passent au vert et l’onglet Output Parameters du noeud train-model affiche une accuracy autour de 0.86. Si le run echoue en ImagePullBackOff, votre cluster n’a pas acces a Docker Hub : configurez un mirror ou un registry interne avant de continuer.

Etape 8 — Parametres et artifacts

Une pipeline figee n’a pas grand interet : on veut faire varier la taille du dataset, le seuil de regularisation, l’algo. KFP permet de declarer des parametres au niveau de la pipeline et de les propager aux composants. Modifiez prepare_data et training_pipeline pour exposer n_samples :

@dsl.component(...)
def prepare_data(n_samples: int, dataset: Output[Dataset]) -> None:
    import pandas as pd
    from sklearn.datasets import make_classification
    X, y = make_classification(n_samples=n_samples, n_features=10, random_state=42)
    df = pd.DataFrame(X, columns=[f"f{i}" for i in range(10)])
    df["label"] = y
    df.to_csv(dataset.path, index=False)

@dsl.pipeline(name="train-classifier")
def training_pipeline(n_samples: int = 1000) -> float:
    prep_task = prepare_data(n_samples=n_samples)
    train_task = train_model(dataset=prep_task.outputs["dataset"])
    return train_task.output

Le parametre n_samples avec sa valeur par defaut 1000 apparaitra dans l’UI au lancement d’un nouveau run : un champ texte permet de le surcharger. Cote Python, on le passe via le dictionnaire arguments :

client.create_run_from_pipeline_func(
    pipeline_func=training_pipeline,
    arguments={"n_samples": 5000},
    experiment_name="demo-classifier",
    run_name="run-5k",
)

Cote artifacts, le fichier dataset.csv produit par prepare_data et le modele model.joblib produit par train_model sont automatiquement uploades dans MinIO (le bucket mlpipeline du namespace kubeflow). Cliquez sur un noeud dans l’UI puis sur l’onglet Artifacts : un lien Download permet de recuperer le fichier brut. C’est la base de la reproductibilite : six mois plus tard, vous retrouvez exactement le dataset et le modele d’un run donne.

Etape 9 — Scheduler une pipeline recurrente

Beaucoup de pipelines doivent tourner en boucle : ingestion nocturne, re-entrainement hebdomadaire, sanity check horaire. KFP expose la notion de Recurring Run qui est un cron applique a une pipeline deja uploadee.

Uploadez d’abord la pipeline compilee via l’UI (bouton Upload pipeline en haut a droite de la page Pipelines, choisissez training_pipeline.yaml) ou par API :

pipeline = client.upload_pipeline(
    pipeline_package_path="training_pipeline.yaml",
    pipeline_name="train-classifier",
)
print(pipeline.pipeline_id)

Recuperez la version de la pipeline et l’ID de l’experience, puis creez un job recurrent qui tourne tous les jours a 2h du matin :

experiment = client.create_experiment(name="nightly-retrain")
version = client.list_pipeline_versions(pipeline.pipeline_id).pipeline_versions[0]

client.create_recurring_run(
    experiment_id=experiment.experiment_id,
    job_name="nightly-train",
    pipeline_id=pipeline.pipeline_id,
    version_id=version.pipeline_version_id,
    cron_expression="0 0 2 * * *",  # 6 champs : seconde minute heure jour mois jour-semaine
    params={"n_samples": 10000},
)

Attention au format cron : KFP utilise un cron a 6 champs (incluant les secondes), pas le cron Unix classique a 5 champs. 0 0 2 * * * signifie tous les jours a 02:00:00. Verifiez le job dans l’UI sous Recurring Runs ; le statut doit afficher Enabled. Vous pouvez le suspendre via client.disable_recurring_run(job_id) sans le supprimer.

Etape 10 — Logs et debug d’un run echoue

Tot ou tard un run va echouer. Les causes typiques : OOMKilled sur un pod gourmand, image Docker introuvable, package Python en version incompatible, secret manquant. KFP expose trois niveaux d’investigation.

Dans l’UI, cliquez sur le noeud rouge du DAG, puis sur l’onglet Logs : vous voyez la sortie stdout/stderr du conteneur en direct. C’est le premier reflexe. Si le pod a ete tue avant d’avoir pu logguer (OOMKilled), passez en CLI :

kubectl -n kubeflow get pods --sort-by=.metadata.creationTimestamp | tail -20
kubectl -n kubeflow describe pod <nom-du-pod>
kubectl -n kubeflow logs <nom-du-pod> --all-containers --previous

La section Events de describe pod revele les erreurs Kubernetes (FailedScheduling, OOMKilled, ImagePullBackOff). Le flag --previous sur logs recupere les logs du conteneur avant son dernier crash, utile quand un pod redemarre en boucle.

Pour les soucis cote API server KFP, regardez les logs du pod ml-pipeline :

kubectl -n kubeflow logs deploy/ml-pipeline -f

Une erreur frequente : workflow does not exist. Elle signifie qu’un run a ete supprime cote Argo mais que l’entree subsiste dans MySQL. La purge se fait via client.archive_run(run_id) ou directement dans l’UI (bouton Archive sur la page du run).

Erreurs frequentes

Symptome Cause probable Correction
Pod minio en Pending apres install Pas de StorageClass par defaut sur le cluster Installer local-path-provisioner ou marquer une SC existante : kubectl patch sc <nom> -p '{"metadata":{"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'
kfp.Client : Connection refused Port-forward coupe ou mauvais port Relancer kubectl -n kubeflow port-forward svc/ml-pipeline-ui 8080:80 dans un terminal dedie
Compilation : TypeError invalid input type Annotation Python non supportee (ex : list non parametrise) Utiliser list[str] ou les types KFP Input/Output[Dataset/Model/Artifact]
Run bloque en Pending indefiniment Pas assez de CPU/RAM sur les noeuds kubectl describe pod sur le pod du noeud Argo, lire la section Events
Composant OOMKilled au milieu de l’entrainement Dataset trop gros pour la RAM par defaut du pod Ajouter .set_memory_limit("4Gi") sur le PipelineTask dans la pipeline
Recurring run jamais declenche Cron a 5 champs au lieu de 6 Verifier que l’expression a 6 champs incluant les secondes : 0 0 2 * * *
upload_pipeline : pipeline name already exists Une pipeline du meme nom existe deja Utiliser upload_pipeline_version pour ajouter une nouvelle version, ou supprimer l’ancienne

Ressources officielles

Tutoriels associes

FAQ

Puis-je utiliser KFP sans Kubeflow complet ?
Oui, c’est exactement le mode standalone de ce tutoriel. Vous installez uniquement les composants Pipelines (API server, UI, MySQL, MinIO, workflow controller) sans Notebooks, sans Katib, sans KServe. C’est la facon la plus rapide d’essayer KFP sur un cluster existant.

Quelle difference entre create_run_from_pipeline_func et create_run_from_pipeline_package ?
La premiere compile la pipeline a la volee depuis la fonction Python, pratique en dev. La seconde prend un fichier YAML deja compile, pratique en CI/CD ou pour rejouer un run exact. Pour la production, compilez une fois et soumettez le YAML.

Comment passer un GPU a un composant ?
Sur le PipelineTask retourne par l’appel du composant dans la pipeline, ajoutez .set_accelerator_type("nvidia.com/gpu").set_accelerator_limit(1). Le cluster doit avoir le NVIDIA device plugin installe et au moins un noeud avec GPU disponible.

Mes artifacts disparaissent au bout de quelques jours, normal ?
MinIO embarque par KFP standalone utilise un PVC unique. Si vous lancez beaucoup de runs avec de gros artifacts, l’espace se remplit et les anciens objets peuvent etre evinces par les politiques de retention. En production, branchez KFP sur un object store externe (S3, GCS, MinIO dedie) via la config kfp-launcher.

Comment versionner une pipeline ?
Apres un premier upload_pipeline, utilisez client.upload_pipeline_version_from_pipeline_package_path avec le meme pipeline_id. Chaque version garde son YAML compile et peut etre relancee independamment, ce qui permet de comparer deux versions cote a cote dans l’UI.

KFP gere-t-il les pipelines distribuees sur plusieurs noeuds (entrainement multi-GPU) ?
KFP orchestre des composants, chaque composant etant un pod. Pour de l’entrainement distribue sur plusieurs noeuds, utilisez un composant qui lance un PyTorchJob ou un TFJob via l’operateur Training de Kubeflow. KFP attend la fin du job avant de passer a l’etape suivante.

Tutoriels associés (complément)

Service ITSkillsCenter

Site ou application web sur mesure

Conception Pro + Nom de domaine 1 an + Hébergement 1 an + Formation + Support 6 mois. Accès et code livrés. À partir de 350 000 FCFA.

Demander un devis
Publicité