Когда Event Loop в asyncio может быть заблокирован?

Ответ

Event Loop в asyncio может быть заблокирован, если внутри асинхронной функции (корутины) выполняется синхронный (блокирующий) код, который не отдает управление обратно в цикл событий. Это приводит к тому, что весь Event Loop перестает обрабатывать другие задачи, вызывая "зависание" приложения.

Основные причины блокировки Event Loop:

  • Длительные CPU-bound операции: Вычисления, которые занимают много процессорного времени без использования await или передачи управления. Например, сложные математические расчеты, обработка больших объемов данных в одном потоке.
  • Синхронные I/O операции: Использование стандартных блокирующих функций ввода/вывода, таких как:
    • Чтение/запись файлов без асинхронных библиотек (например, open() вместо aiofiles).
    • Сетевые запросы с использованием блокирующих библиотек (например, requests вместо aiohttp).
    • Доступ к базам данных с использованием синхронных драйверов.
  • Использование time.sleep(): Вместо асинхронного await asyncio.sleep(), который корректно отдает управление Event Loop.

Пример блокировки:

import asyncio
import time

async def blocking_task():
    print(f"[{time.time():.2f}] Blocking task started...")
    time.sleep(3)  # <-- Это блокирует Event Loop на 3 секунды
    print(f"[{time.time():.2f}] Blocking task finished.")

async def non_blocking_task():
    for i in range(3):
        print(f"[{time.time():.2f}] Non-blocking task running ({i+1}/3)")
        await asyncio.sleep(1) # Отдает управление Event Loop

async def main():
    print(f"[{time.time():.2f}] Main started.")
    await asyncio.gather(
        blocking_task(),
        non_blocking_task()
    )
    print(f"[{time.time():.2f}] Main finished.")

if __name__ == "__main__":
    asyncio.run(main())

В этом примере non_blocking_task не сможет выполняться параллельно с blocking_task, пока time.sleep(3) не завершится.

Решения для предотвращения блокировки:

  1. Использовать await для всех асинхронных операций: Убедитесь, что все I/O-операции и другие потенциально блокирующие вызовы используют await с соответствующими асинхронными библиотеками (например, aiohttp, aiofiles, asyncpg).
  2. Выносить блокирующий код в отдельные потоки/процессы: Для CPU-bound операций или использования синхронных библиотек, которые не имеют асинхронных аналогов, используйте loop.run_in_executor(). Это позволяет выполнять блокирующий код в отдельном пуле потоков (или процессов), не блокируя основной Event Loop.

    import asyncio
    import time
    import concurrent.futures # Для ThreadPoolExecutor
    
    def sync_blocking_function():
        print(f"[{time.time():.2f}] Sync blocking function started...")
        time.sleep(3)
        print(f"[{time.time():.2f}] Sync blocking function finished.")
        return "Result from sync function"
    
    async def async_wrapper():
        loop = asyncio.get_running_loop()
        # Выполняем блокирующую функцию в отдельном потоке
        result = await loop.run_in_executor(
            None, # Использует default ThreadPoolExecutor
            sync_blocking_function
        )
        print(f"[{time.time():.2f}] Async wrapper received: {result}")
    
    async def main_executor():
        print(f"[{time.time():.2f}] Main executor started.")
        await asyncio.gather(
            async_wrapper(),
            non_blocking_task() # Из предыдущего примера
        )
        print(f"[{time.time():.2f}] Main executor finished.")
    
    if __name__ == "__main__":
        asyncio.run(main_executor())

    В этом случае sync_blocking_function выполняется в отдельном потоке, и non_blocking_task может продолжать работу, не дожидаясь ее завершения.