🔝 الدليل الرئيسي للسلسلة: بايثون: لغة ومنظومة وأطر للمطوّرين
أصبح asyncio في 2026 الأساس الافتراضي لأي خدمة Python يجب أن تُدير I/O متزامنًا على نطاق كبير: API REST، عمّال خلفيون، scrapers، مجمّعات WebSockets، خطوط أنابيب البيانات. الوحدة القياسية asyncio تطوّرت كثيرًا منذ Python 3.7 وتستفيد في 3.11+ من TaskGroup (التزامن المُهيكَل) وasyncio.timeout() اللذين يُبسّطان جذريًا إدارة الأخطاء والمهل. يستعرض هذا الدرس اللبنات الأساسية لبناء خدمة غير متزامنة لا تتعطّل في الإنتاج: event loop مُهيَّأة جيدًا، coroutines منضبطة، إدارة دقيقة للإلغاءات، تكامل HTTP وقاعدة البيانات، والملاحظة.
المتطلبات
- Python 3.14.5 (مُوصى به) أو 3.13.13 (راجع تثبيت Python 3)
- أساسيات الـ coroutines، الكلمتان
asyncوawait - مشروع Python بـ pyproject.toml
- الوقت المُقدَّر: 90 دقيقة
الخطوة 1 — تشغيل event loop بشكل صحيح
برنامج asyncio يُختزَل، ظاهريًا، إلى نقطة دخول asyncio.run(main()). تحت الغطاء، تُنشئ هذه الدالة event loop، تُنفّذ الكوروتين الرئيسي حتى الانتهاء، ثم تُغلق loop بنظافة.
import asyncio
import logging
async def main():
logging.info("Service démarré")
await travailler()
logging.info("Service arrêté proprement")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
try:
asyncio.run(main())
except KeyboardInterrupt:
logging.info("Arrêt sur Ctrl+C")
ثلاث نقاط دقيقة لكنها حاسمة. asyncio.run() لا يتسامح إلا مع استدعاء واحد لكل عملية، مما يفرض انضباطًا هيكليًا. KeyboardInterrupt يُلتقَط لتسجيل الإيقاف. على Windows، يختار asyncio تلقائيًا ProactorEventLoop؛ على Linux وmacOS، SelectorEventLoop.
الخطوة 2 — Coroutines وawait وasyncio.gather
الـ coroutine تُعرَّف بـ async def. إنها كسولة: جسمها لا يُنفَّذ إلا عند await. لتشغيل عدة كوروتينات بالتوازي، نستخدم asyncio.gather().
async def fetch(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
async def main():
urls = [
"https://api.example.com/users/1",
"https://api.example.com/users/2",
"https://api.example.com/users/3",
]
# Séquentiel — 3× la latence
for url in urls:
donnees = await fetch(url)
print(donnees)
# Parallèle — max(latence) au lieu de somme
resultats = await asyncio.gather(*[fetch(u) for u in urls])
for r in resultats:
print(r)
القاعدة الذهبية: gather للاستدعاءات المستقلة، حلقة for await للاستدعاءات المتبعِية. خطأ شائع: await تسلسليًا في حلقة لما يمكن فعله بالتوازي، فيتحوّل استدعاء 200 ms إلى ثانيتين لـ 10 URLs.
عيب gather: إذا رفعت إحدى الكوروتينات استثناءً، تستمر الأخريات في الخلفية. لإلغاء الكل عند أول خطأ، TaskGroup يُلغي الأشقاء بشكل نظيف.
الخطوة 3 — TaskGroup: structured concurrency منذ 3.11
يجلب asyncio.TaskGroup (Python 3.11+) التزامن المُهيكَل: كتلة async with تضمن أن لا تنجو أي مهمة خارج نطاقها. إذا رفعت إحداها، تُلغى الأخريات بنظافة ويصعد الاستثناء كـ ExceptionGroup.
async def fetch_or_fail(url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
async def main():
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch_or_fail("https://api.example.com/users/1"))
t2 = tg.create_task(fetch_or_fail("https://api.example.com/users/2"))
t3 = tg.create_task(fetch_or_fail("https://api.example.com/users/3"))
print(t1.result(), t2.result(), t3.result())
except* httpx.HTTPStatusError as eg:
for exc in eg.exceptions:
logging.error("Échec HTTP : %s", exc)
جديدتان نحويتان مهمتان. tg.create_task() يحل محل asyncio.create_task() للبقاء في نطاق المجموعة. except* (مع نجمة، منذ 3.11) يطابق الاستثناءات الفردية داخل ExceptionGroup.
الخطوة 4 — Timeouts والإلغاء النظيف
أي استدعاء شبكي قد يتأخر إلى الأبد. asyncio.timeout() مستقر منذ 3.11.
async def fetch_avec_timeout(url: str) -> dict | None:
try:
async with asyncio.timeout(5.0):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
except TimeoutError:
logging.warning("Timeout sur %s", url)
return None
except asyncio.CancelledError:
# Toujours propager CancelledError pour respecter la chaîne d'annulation
raise
ثلاث انضباطات. التقاط TimeoutError منفصلًا. لا تلتقط أبدًا asyncio.CancelledError دون إعادة رفعها — تكسر سلسلة الإلغاء. حدّ كل استدعاءات الشبكة بمهلة.
الخطوة 5 — httpx غير المتزامن
httpx هو المكتبة المرجع لـ HTTP غير المتزامن في 2026. تدعم HTTP/2، connection pooling، مهل دقيقة.
class ApiClient:
def __init__(self, base_url: str):
self._client = httpx.AsyncClient(
base_url=base_url,
timeout=httpx.Timeout(10.0, connect=5.0),
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
http2=True
)
async def fetch_user(self, user_id: int) -> dict:
response = await self._client.get(f"/users/{user_id}")
response.raise_for_status()
return response.json()
async def close(self):
await self._client.aclose()
async def main():
client = ApiClient("https://api.example.com")
try:
user = await client.fetch_user(42)
finally:
await client.close()
كسب الأداء جوهري: إنشاء AsyncClient جديد لكل طلب يُلغي ميزة pooling. في خدمة FastAPI، نحقن singleton ApiClient عبر Dependency Injection.
الخطوة 6 — قواعد البيانات غير المتزامنة
على جانب PostgreSQL، asyncpg هو driver الأسرع. SQLAlchemy 2.0+ يكشف API async عبر create_async_engine وAsyncSession. لـ SQLite، aiosqlite.
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
pool_recycle=3600
)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_user(user_id: int) -> User | None:
async with AsyncSessionLocal() as session:
result = await session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
ثلاثة إعدادات حاسمة للإنتاج. pool_pre_ping=True يختبر الاتصال قبل كل استعارة. pool_recycle=3600 يُعيد تدوير الاتصالات بعد ساعة. expire_on_commit=False يتجنّب أن يُبطل SQLAlchemy الكائنات بعد commit.
الخطوة 7 — Workers ونمط المُنتج/المُستهلك
لمعالجة تدفق مهام خلفية، نمط producer/consumer مع asyncio.Queue يُهيكل التدفق بنظافة.
async def worker(name: str, queue: asyncio.Queue):
while True:
tache = await queue.get()
if tache is None:
queue.task_done()
break
try:
await traiter(tache)
except Exception:
logging.exception("Erreur worker %s sur %s", name, tache)
finally:
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=100)
async with asyncio.TaskGroup() as tg:
workers = [tg.create_task(worker(f"w{i}", queue)) for i in range(5)]
for tache in source_de_taches():
await queue.put(tache)
for _ in workers:
await queue.put(None)
await queue.join()
maxsize=100 يُطبّق backpressure: إذا تأخّر العمّال، ينتظر المنتج. يحمي ذاكرة العملية من حمل مفاجئ. try/finally queue.task_done() يضمن أن queue.join() يكتشف النهاية حتى لو انفجر عامل.
الخطوة 8 — الملاحظة الدنيا
خدمة asyncio في الإنتاج تحتاج ثلاث إشارات: logs مُهيكَلة، metrics (Prometheus)، traces موزّعة (OpenTelemetry).
import structlog
from prometheus_client import Counter, Histogram, start_http_server
logger = structlog.get_logger()
REQUESTS = Counter("api_requests_total", "Requêtes API", ["endpoint", "status"])
LATENCY = Histogram("api_latency_seconds", "Latence API", ["endpoint"])
async def fetch_traced(url: str):
start = asyncio.get_running_loop().time()
try:
async with httpx.AsyncClient() as client:
response = await client.get(url)
REQUESTS.labels(endpoint=url, status=str(response.status_code)).inc()
return response.json()
finally:
LATENCY.labels(endpoint=url).observe(asyncio.get_running_loop().time() - start)
start_http_server(9090)
بهذه الأسطر، Prometheus يستطيع scraping وGrafana عرض p50/p95/p99 لكل endpoint. لـ OpenTelemetry، auto-instrumentations لـ httpx وasyncpg يُتتبّعان كل عملية تلقائيًا.
أخطاء شائعة
| العَرَض | السبب | الحل |
|---|---|---|
| « Cannot run the event loop while another loop is running » | asyncio.run() متداخل |
asyncio.run() واحد لكل عملية |
| Coroutine غير awaited صامتة | asyncio.create_task() بدون مرجع محفوظ |
احفظ المرجع أو استخدم TaskGroup |
| الخدمة تتجمّد عند الإيقاف | CancelledError تم التهامها | أعد رفع CancelledError دائمًا |
| بطء بالتوازي | استدعاء CPU-bound يحجب event loop | فوّض عبر run_in_executor |
| الذاكرة تنمو | Coroutines يتيمة بدون timeout | حدّ بـ asyncio.timeout() منهجيًا |
| Pool اتصالات منهك | Sessions DB غير مُغلَقة | دائمًا async with AsyncSession() |
الأسئلة الشائعة
asyncio أم threads؟
asyncio لـ I/O-bound. Threads لكتل CPU خفيفة. لـ CPU ثقيل، عمليات منفصلة.
هل نستخدم uvloop؟
uvloop يحل محل event loop الأصلية بتنفيذ libuv أسرع 2-4×. مُوصى به في الإنتاج على Linux/macOS.
كيف ندمج كود متزامن؟await asyncio.to_thread(func, *args) يُنفّذ func في thread دون حجب loop. لـ CPU-bound: loop.run_in_executor(ProcessPoolExecutor(), func).
FastAPI أم Starlette؟
FastAPI للتحقق Pydantic وOpenAPI المجاني. Starlette للـ middlewares ASGI خاصة جدًا.
كيف نختبر asyncio؟pytest-asyncio في وضع auto، كما هو مفصّل في Pytest المتقدم.
أي معدّل تزامن نستهدف؟
لخدمة HTTP بسيطة على vCPU واحد: 200-500 اتصال متزامن. مع uvloop وhttpx HTTP/2، آلاف. قِس دائمًا تحت الحمل الحقيقي.