Asyncio est devenu en 2026 la fondation par défaut pour tout service Python qui doit gérer du I/O concurrent à grande échelle : API REST, workers d’arrière-plan, scrapers, agrégateurs de WebSockets, pipelines de données. Le module standard asyncio a beaucoup évolué depuis Python 3.7 et bénéficie en 3.11+ de TaskGroup (structured concurrency) et asyncio.timeout() qui simplifient radicalement la gestion des erreurs et des timeouts. Ce tutoriel reprend les briques essentielles pour bâtir un service asynchrone qui ne lâche pas la rampe en production : event loop bien initialisée, coroutines disciplinées, gestion fine des annulations, intégration HTTP et base de données, et observabilité.
Prérequis
- Python 3.14.5 (recommandé) ou 3.13.13 (cf. Installer Python 3)
- Notions de coroutines, mots-clés
asyncetawait - Un projet Python avec
pyproject.toml - Temps estimé : 90 minutes
Étape 1 — Démarrer correctement l’event loop
Un programme asyncio se réduit, en surface, à un point d’entrée asyncio.run(main()). Sous le capot, cette fonction crée une event loop, exécute la coroutine principale jusqu’à terminaison, puis ferme proprement la loop. Pour la majorité des cas, c’est la bonne abstraction — on ne touche jamais directement à get_event_loop() ni à new_event_loop(), qui sont sources de bugs subtils.
import asyncio
import logging
async def main():
logging.info("Service démarré")
await travailler()
logging.info("Service arrêté proprement")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Arrêt sur Ctrl+C")
Trois points discrets mais critiques. asyncio.run() ne tolère qu’un appel par processus, ce qui force une discipline structurante : on ne dispatche pas des sous-asyncio depuis une fonction synchrone aléatoire. KeyboardInterrupt est capturé pour journaliser l’arrêt et permettre aux tâches en cours de terminer leur teardown. Sur Windows, asyncio choisit automatiquement le ProactorEventLoop ; sur Linux et macOS, le SelectorEventLoop. Inutile d’imposer l’un ou l’autre sauf cas particulier (sous-process, FFI).
Étape 2 — Coroutines, await et asyncio.gather
Une coroutine est définie par async def. Elle est paresseuse : son corps ne s’exécute que quand on la await. Pour lancer plusieurs coroutines en parallèle (et non séquentiellement), on utilise asyncio.gather() qui attend qu’elles terminent toutes et renvoie leurs résultats dans l’ordre des arguments.
async def fetch(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
async def main():
urls = [
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3",
]
# Séquentiel — 3× la latence
for url in urls:
donnees = await fetch(url)
print(donnees)
# Parallèle — max(latence) au lieu de somme
resultats = await asyncio.gather(*[fetch(u) for u in urls])
for r in resultats:
print(r)
La règle d’or : gather pour des appels indépendants, boucle for await pour des appels qui dépendent les uns des autres. Une erreur typique consiste à await en série dans un for ce qu’on pourrait faire en parallèle, ce qui transforme un appel de 200 ms en 2 secondes pour 10 URLs. Toujours se demander : « ces appels dépendent-ils du résultat précédent ? » Si non, paralléliser.
Le défaut de gather : si une coroutine lève une exception, les autres continuent en arrière-plan jusqu’à leur fin avant que l’exception remonte. Pour annuler le tout dès la première erreur, l’option return_exceptions=True capture les exceptions sous forme de valeurs (plus de propagation) ou TaskGroup annule proprement les sœurs.
Étape 3 — TaskGroup : structured concurrency depuis Python 3.11
Introduit en Python 3.11, asyncio.TaskGroup apporte la structured concurrency : un bloc async with qui garantit qu’aucune tâche ne survit au-delà de sa portée. Si l’une lève, les autres sont annulées proprement et l’exception remonte sous forme d’ExceptionGroup. Pour tout nouveau service en 3.11+, c’est le bon défaut, plus sûr que gather brut.
async def fetch_or_fail(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
async def main():
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_or_fail("https://api.example.com/users/1"))
t2 = tg.create_task(fetch_or_fail("https://api.example.com/users/2"))
t3 = tg.create_task(fetch_or_fail("https://api.example.com/users/3"))
# À la sortie du with, toutes les tâches ont terminé OU été annulées
print(t1.result(), t2.result(), t3.result())
except* httpx.HTTPStatusError as eg:
for exc in eg.exceptions:
logging.error("Échec HTTP : %s", exc)
Deux nouveautés syntaxiques importantes. tg.create_task() remplace asyncio.create_task() pour rester dans le périmètre du groupe. except* (avec étoile, depuis 3.11) match les exceptions individuelles à l’intérieur d’un ExceptionGroup. Cette structure rend visible une réalité d’asyncio : plusieurs tâches peuvent lever simultanément, et il faut un mécanisme pour les inspecter toutes, pas seulement la première.
Étape 4 — Timeouts et annulation propre
Tout appel réseau peut traîner indéfiniment si l’autre bout ne répond plus. asyncio.timeout(), stable depuis 3.11, est le mécanisme moderne pour borner la durée d’une opération. Il fonctionne comme un gestionnaire de contexte et lève TimeoutError à la fin du délai si la coroutine n’a pas terminé.
async def fetch_avec_timeout(url: str) -> dict | None:
try:
async with asyncio.timeout(5.0):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
except TimeoutError:
logging.warning("Timeout sur %s", url)
return None
except asyncio.CancelledError:
# Toujours propager CancelledError pour respecter la chaîne d'annulation
raise
Trois disciplines à retenir. Toujours capturer TimeoutError séparément pour un comportement de repli explicite (retry, valeur par défaut, log d’alerte). Ne JAMAIS attraper asyncio.CancelledError sans le re-lever : elle signale que la coroutine doit s’arrêter, et l’avaler casse la chaîne d’annulation du TaskGroup parent. Bornez TOUS les appels réseau — un service sans timeout finit invariablement avec des coroutines orphelines qui consomment de la mémoire.
Étape 5 — httpx asynchrone
httpx est la bibliothèque HTTP asynchrone de référence en 2026 (équivalent moderne de requests, qui reste synchrone). Elle supporte HTTP/2 (via l’extra httpx[h2] qui installe la bibliothèque h2), connection pooling, timeouts granulaires et s’intègre nativement avec asyncio.
# Réutiliser un client pour bénéficier du connection pooling
class ApiClient:
def __init__(self, base_url: str):
self._client = httpx.AsyncClient(
base_url=base_url,
timeout=httpx.Timeout(10.0, connect=5.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
http2=True
)
async def fetch_user(self, user_id: int) -> dict:
response = await self._client.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
async def close(self):
await self._client.aclose()
# Usage avec gestionnaire de contexte
async def main():
client = ApiClient("https://api.example.com")
try:
user = await client.fetch_user(42)
finally:
await client.close()
Le gain perf est substantiel : créer un nouveau AsyncClient par requête détruit l’avantage du pooling. Dans un service FastAPI, on injecte un singleton ApiClient via Dependency Injection au démarrage. Pour des scripts ponctuels, le async with httpx.AsyncClient() as client: reste pratique.
Étape 6 — Bases de données asynchrones
Côté PostgreSQL, asyncpg est le driver natif asynchrone le plus rapide. SQLAlchemy 2.0+ expose une API async via create_async_engine et AsyncSession, qui reste l’option recommandée pour les applications structurées. Pour SQLite, aiosqlite wrap le module sqlite3 standard.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_user(user_id: int) -> User | None:
async with AsyncSessionLocal() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
Trois réglages cruciaux pour la production. pool_pre_ping=True teste la connexion avant chaque emprunt — coût négligeable, évite les erreurs sur connexion stale. pool_recycle=3600 recycle les connexions après 1 heure pour contourner les coupures côté PG (idle timeout). expire_on_commit=False évite que SQLAlchemy invalide les objets après commit, ce qui forcerait une nouvelle requête à chaque accès d’attribut.
Étape 7 — Workers et patterns producteur/consommateur
Pour traiter un flux de tâches en arrière-plan (envoi d’emails, calculs lourds, synchros), le pattern producer/consumer avec asyncio.Queue structure proprement le flot. Un producteur dépose dans la queue, N consommateurs vident en parallèle, et un signal de fin propre via None sentinelle ou queue.join() garantit qu’aucune tâche n’est perdue.
async def worker(name: str, queue: asyncio.Queue):
while True:
tache = await queue.get()
if tache is None:
queue.task_done()
break
try:
await traiter(tache)
except Exception:
logging.exception("Erreur worker %s sur %s", name, tache)
finally:
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=100)
async with asyncio.TaskGroup() as tg:
# 5 consommateurs en parallèle
workers = [tg.create_task(worker(f"w{i}", queue)) for i in range(5)]
# Producteur
for tache in source_de_taches():
await queue.put(tache)
# Sentinelles pour arrêter chaque worker
for _ in workers:
await queue.put(None)
await queue.join()
Le maxsize=100 applique du backpressure : si les workers traînent, le producteur attend que la queue se vide. Ce mécanisme protège la mémoire du processus en cas de surcharge brutale. Le try/finally queue.task_done() garantit que queue.join() détecte la fin même si un worker explose sur une tâche.
Étape 8 — Observabilité minimale
Un service asyncio en production a besoin de trois signaux : logs structurés, métriques (Prometheus), traces distribuées (OpenTelemetry). Le minimum vital : structlog pour des logs JSON, prometheus_client pour exposer des compteurs et histogrammes, et le SDK OpenTelemetry pour tracer les appels HTTP et DB.
import structlog
from prometheus_client import Counter, Histogram, start_http_server
logger = structlog.get_logger()
REQUESTS = Counter("api_requests_total", "Requêtes API", ["endpoint", "status"])
LATENCY = Histogram("api_latency_seconds", "Latence API", ["endpoint"])
async def fetch_traced(url: str):
start = asyncio.get_running_loop().time()
try:
async with httpx.AsyncClient() as client:
response = await client.get(url)
REQUESTS.labels(endpoint=url, status=str(response.status_code)).inc()
return response.json()
finally:
LATENCY.labels(endpoint=url).observe(asyncio.get_running_loop().time() - start)
# Au démarrage du service, exposer /metrics sur le port 9090
start_http_server(9090)
Avec ces trois lignes, Prometheus peut scraper les métriques et Grafana afficher latence p50/p95/p99 par endpoint. Pour OpenTelemetry, les auto-instrumentations de httpx et asyncpg tracent automatiquement chaque opération sans toucher au code métier.
Erreurs fréquentes
| Symptôme | Cause | Solution |
|---|---|---|
| « Cannot run the event loop while another loop is running » | asyncio.run() imbriqué | Un seul asyncio.run() par processus ; depuis du sync, utiliser asyncio.run_coroutine_threadsafe |
| Coroutine non awaitée silencieuse | asyncio.create_task() sans référence gardée | Garder la référence ou utiliser TaskGroup |
| Service qui hang à l’arrêt | CancelledError avalée dans un except | Toujours re-raise CancelledError |
| Lenteur en parallèle | Appel CPU-bound bloquant l’event loop | Déléguer via run_in_executor ou processus séparé |
| Mémoire qui croît | Coroutines orphelines sans timeout | Borner systématiquement avec asyncio.timeout() |
| Pool de connexions épuisé | Sessions DB non fermées | Toujours async with AsyncSession() as session |
Foire aux questions
asyncio ou threads ?
asyncio pour I/O-bound (réseau, disque). Threads pour blocs CPU légers ou bibliothèques synchrones inévitables. Pour CPU lourd, processus séparés.
Faut-il uvloop ?
uvloop remplace l’event loop native par une implémentation libuv 2-4× plus rapide. Recommandé en production sur Linux/macOS pour les services à fort débit.
Comment intégrer du code synchrone ?await asyncio.to_thread(func, *args) exécute func dans un thread sans bloquer la loop. Pour du CPU-bound, loop.run_in_executor(ProcessPoolExecutor(), func).
FastAPI ou Starlette pur ?
FastAPI pour la validation Pydantic et la doc OpenAPI gratuite. Starlette pur pour des middlewares ASGI très spécifiques sans surcharge.
Comment tester du code asyncio ?
pytest-asyncio en mode auto, comme détaillé dans le tutoriel Pytest avancé.
Quel taux de concurrence cibler ?
Pour un service HTTP simple sur 1 vCPU : 200-500 connexions simultanées. Avec uvloop et httpx HTTP/2, on monte à plusieurs milliers. Toujours mesurer en charge réelle.
Pour aller plus loin
La couche asynchrone en place, l’étape suivante consiste à structurer la validation des données échangées avec Pydantic v2, indispensable pour les APIs et les configurations. Pour la vue panoramique, voir le guide principal Python.