تطوير الويب

Asyncio في الإنتاج: event loop وTaskGroup والتزامن المُهيكَل

7 دقائق للقراءة

🔝 الدليل الرئيسي للسلسلة: بايثون: لغة ومنظومة وأطر للمطوّرين

أصبح asyncio في 2026 الأساس الافتراضي لأي خدمة Python يجب أن تُدير I/O متزامنًا على نطاق كبير: API REST، عمّال خلفيون، scrapers، مجمّعات WebSockets، خطوط أنابيب البيانات. الوحدة القياسية asyncio تطوّرت كثيرًا منذ Python 3.7 وتستفيد في 3.11+ من TaskGroup (التزامن المُهيكَل) وasyncio.timeout() اللذين يُبسّطان جذريًا إدارة الأخطاء والمهل. يستعرض هذا الدرس اللبنات الأساسية لبناء خدمة غير متزامنة لا تتعطّل في الإنتاج: event loop مُهيَّأة جيدًا، coroutines منضبطة، إدارة دقيقة للإلغاءات، تكامل HTTP وقاعدة البيانات، والملاحظة.

المتطلبات

  • Python 3.14.5 (مُوصى به) أو 3.13.13 (راجع تثبيت Python 3)
  • أساسيات الـ coroutines، الكلمتان async وawait
  • مشروع Python بـ pyproject.toml
  • الوقت المُقدَّر: 90 دقيقة

الخطوة 1 — تشغيل event loop بشكل صحيح

برنامج asyncio يُختزَل، ظاهريًا، إلى نقطة دخول asyncio.run(main()). تحت الغطاء، تُنشئ هذه الدالة event loop، تُنفّذ الكوروتين الرئيسي حتى الانتهاء، ثم تُغلق loop بنظافة.

import asyncio
import logging

async def main():
    logging.info("Service démarré")
    await travailler()
    logging.info("Service arrêté proprement")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        logging.info("Arrêt sur Ctrl+C")

ثلاث نقاط دقيقة لكنها حاسمة. asyncio.run() لا يتسامح إلا مع استدعاء واحد لكل عملية، مما يفرض انضباطًا هيكليًا. KeyboardInterrupt يُلتقَط لتسجيل الإيقاف. على Windows، يختار asyncio تلقائيًا ProactorEventLoop؛ على Linux وmacOS، SelectorEventLoop.

الخطوة 2 — Coroutines وawait وasyncio.gather

الـ coroutine تُعرَّف بـ async def. إنها كسولة: جسمها لا يُنفَّذ إلا عند await. لتشغيل عدة كوروتينات بالتوازي، نستخدم asyncio.gather().

async def fetch(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10.0)
        response.raise_for_status()
        return response.json()

async def main():
    urls = [
        "https://api.example.com/users/1",
        "https://api.example.com/users/2",
        "https://api.example.com/users/3",
    ]
    # Séquentiel — 3× la latence
    for url in urls:
        donnees = await fetch(url)
        print(donnees)

    # Parallèle — max(latence) au lieu de somme
    resultats = await asyncio.gather(*[fetch(u) for u in urls])
    for r in resultats:
        print(r)

القاعدة الذهبية: gather للاستدعاءات المستقلة، حلقة for await للاستدعاءات المتبعِية. خطأ شائع: await تسلسليًا في حلقة لما يمكن فعله بالتوازي، فيتحوّل استدعاء 200 ms إلى ثانيتين لـ 10 URLs.

عيب gather: إذا رفعت إحدى الكوروتينات استثناءً، تستمر الأخريات في الخلفية. لإلغاء الكل عند أول خطأ، TaskGroup يُلغي الأشقاء بشكل نظيف.

الخطوة 3 — TaskGroup: structured concurrency منذ 3.11

يجلب asyncio.TaskGroup (Python 3.11+) التزامن المُهيكَل: كتلة async with تضمن أن لا تنجو أي مهمة خارج نطاقها. إذا رفعت إحداها، تُلغى الأخريات بنظافة ويصعد الاستثناء كـ ExceptionGroup.

async def fetch_or_fail(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10.0)
        response.raise_for_status()
        return response.json()

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(fetch_or_fail("https://api.example.com/users/1"))
            t2 = tg.create_task(fetch_or_fail("https://api.example.com/users/2"))
            t3 = tg.create_task(fetch_or_fail("https://api.example.com/users/3"))
        print(t1.result(), t2.result(), t3.result())
    except* httpx.HTTPStatusError as eg:
        for exc in eg.exceptions:
            logging.error("Échec HTTP : %s", exc)

جديدتان نحويتان مهمتان. tg.create_task() يحل محل asyncio.create_task() للبقاء في نطاق المجموعة. except* (مع نجمة، منذ 3.11) يطابق الاستثناءات الفردية داخل ExceptionGroup.

الخطوة 4 — Timeouts والإلغاء النظيف

أي استدعاء شبكي قد يتأخر إلى الأبد. asyncio.timeout() مستقر منذ 3.11.

async def fetch_avec_timeout(url: str) -> dict | None:
    try:
        async with asyncio.timeout(5.0):
            async with httpx.AsyncClient() as client:
                response = await client.get(url)
                return response.json()
    except TimeoutError:
        logging.warning("Timeout sur %s", url)
        return None
    except asyncio.CancelledError:
        # Toujours propager CancelledError pour respecter la chaîne d'annulation
        raise

ثلاث انضباطات. التقاط TimeoutError منفصلًا. لا تلتقط أبدًا asyncio.CancelledError دون إعادة رفعها — تكسر سلسلة الإلغاء. حدّ كل استدعاءات الشبكة بمهلة.

الخطوة 5 — httpx غير المتزامن

httpx هو المكتبة المرجع لـ HTTP غير المتزامن في 2026. تدعم HTTP/2، connection pooling، مهل دقيقة.

class ApiClient:
    def __init__(self, base_url: str):
        self._client = httpx.AsyncClient(
            base_url=base_url,
            timeout=httpx.Timeout(10.0, connect=5.0),
            limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
            http2=True
        )

    async def fetch_user(self, user_id: int) -> dict:
        response = await self._client.get(f"/users/{user_id}")
        response.raise_for_status()
        return response.json()

    async def close(self):
        await self._client.aclose()

async def main():
    client = ApiClient("https://api.example.com")
    try:
        user = await client.fetch_user(42)
    finally:
        await client.close()

كسب الأداء جوهري: إنشاء AsyncClient جديد لكل طلب يُلغي ميزة pooling. في خدمة FastAPI، نحقن singleton ApiClient عبر Dependency Injection.

الخطوة 6 — قواعد البيانات غير المتزامنة

على جانب PostgreSQL، asyncpg هو driver الأسرع. SQLAlchemy 2.0+ يكشف API async عبر create_async_engine وAsyncSession. لـ SQLite، aiosqlite.

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,
    pool_recycle=3600
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_user(user_id: int) -> User | None:
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()

ثلاثة إعدادات حاسمة للإنتاج. pool_pre_ping=True يختبر الاتصال قبل كل استعارة. pool_recycle=3600 يُعيد تدوير الاتصالات بعد ساعة. expire_on_commit=False يتجنّب أن يُبطل SQLAlchemy الكائنات بعد commit.

الخطوة 7 — Workers ونمط المُنتج/المُستهلك

لمعالجة تدفق مهام خلفية، نمط producer/consumer مع asyncio.Queue يُهيكل التدفق بنظافة.

async def worker(name: str, queue: asyncio.Queue):
    while True:
        tache = await queue.get()
        if tache is None:
            queue.task_done()
            break
        try:
            await traiter(tache)
        except Exception:
            logging.exception("Erreur worker %s sur %s", name, tache)
        finally:
            queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=100)
    async with asyncio.TaskGroup() as tg:
        workers = [tg.create_task(worker(f"w{i}", queue)) for i in range(5)]
        for tache in source_de_taches():
            await queue.put(tache)
        for _ in workers:
            await queue.put(None)
        await queue.join()

maxsize=100 يُطبّق backpressure: إذا تأخّر العمّال، ينتظر المنتج. يحمي ذاكرة العملية من حمل مفاجئ. try/finally queue.task_done() يضمن أن queue.join() يكتشف النهاية حتى لو انفجر عامل.

الخطوة 8 — الملاحظة الدنيا

خدمة asyncio في الإنتاج تحتاج ثلاث إشارات: logs مُهيكَلة، metrics (Prometheus)، traces موزّعة (OpenTelemetry).

import structlog
from prometheus_client import Counter, Histogram, start_http_server

logger = structlog.get_logger()
REQUESTS = Counter("api_requests_total", "Requêtes API", ["endpoint", "status"])
LATENCY = Histogram("api_latency_seconds", "Latence API", ["endpoint"])

async def fetch_traced(url: str):
    start = asyncio.get_running_loop().time()
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            REQUESTS.labels(endpoint=url, status=str(response.status_code)).inc()
            return response.json()
    finally:
        LATENCY.labels(endpoint=url).observe(asyncio.get_running_loop().time() - start)

start_http_server(9090)

بهذه الأسطر، Prometheus يستطيع scraping وGrafana عرض p50/p95/p99 لكل endpoint. لـ OpenTelemetry، auto-instrumentations لـ httpx وasyncpg يُتتبّعان كل عملية تلقائيًا.

أخطاء شائعة

العَرَض السبب الحل
« Cannot run the event loop while another loop is running » asyncio.run() متداخل asyncio.run() واحد لكل عملية
Coroutine غير awaited صامتة asyncio.create_task() بدون مرجع محفوظ احفظ المرجع أو استخدم TaskGroup
الخدمة تتجمّد عند الإيقاف CancelledError تم التهامها أعد رفع CancelledError دائمًا
بطء بالتوازي استدعاء CPU-bound يحجب event loop فوّض عبر run_in_executor
الذاكرة تنمو Coroutines يتيمة بدون timeout حدّ بـ asyncio.timeout() منهجيًا
Pool اتصالات منهك Sessions DB غير مُغلَقة دائمًا async with AsyncSession()

الأسئلة الشائعة

asyncio أم threads؟
asyncio لـ I/O-bound. Threads لكتل CPU خفيفة. لـ CPU ثقيل، عمليات منفصلة.

هل نستخدم uvloop؟
uvloop يحل محل event loop الأصلية بتنفيذ libuv أسرع 2-4×. مُوصى به في الإنتاج على Linux/macOS.

كيف ندمج كود متزامن؟
await asyncio.to_thread(func, *args) يُنفّذ func في thread دون حجب loop. لـ CPU-bound: loop.run_in_executor(ProcessPoolExecutor(), func).

FastAPI أم Starlette؟
FastAPI للتحقق Pydantic وOpenAPI المجاني. Starlette للـ middlewares ASGI خاصة جدًا.

كيف نختبر asyncio؟
pytest-asyncio في وضع auto، كما هو مفصّل في Pytest المتقدم.

أي معدّل تزامن نستهدف؟
لخدمة HTTP بسيطة على vCPU واحد: 200-500 اتصال متزامن. مع uvloop وhttpx HTTP/2، آلاف. قِس دائمًا تحت الحمل الحقيقي.

مقالات ذات صلة

Sponsoriser ce contenu

Cet emplacement est à vous

Position premium en fin d'article — c'est l'instant où les lecteurs sont le plus engagés. Réservez cet espace pour votre marque, votre formation ou votre offre.

Recevoir nos tarifs
Publicité