Когда Event Loop в asyncio может быть заблокирован?

Ответ

Event Loop в asyncio может быть заблокирован, если внутри асинхронной функции (корутины) выполняется синхронный (блокирующий) код, который не отдает управление обратно в цикл событий. Это приводит к тому, что весь Event Loop перестает обрабатывать другие задачи, вызывая "зависание" приложения.

Основные причины блокировки Event Loop:

  • Длительные CPU-bound операции: Вычисления, которые занимают много процессорного времени без использования await или передачи управления. Например, сложные математические расчеты, обработка больших объемов данных в одном потоке.
  • Синхронные I/O операции: Использование стандартных блокирующих функций ввода/вывода, таких как:
    • Чтение/запись файлов без асинхронных библиотек (например, open() вместо aiofiles).
    • Сетевые запросы с использованием блокирующих библиотек (например, requests вместо aiohttp).
    • Доступ к базам данных с использованием синхронных драйверов.
  • Использование time.sleep(): Вместо асинхронного await asyncio.sleep(), который корректно отдает управление Event Loop.

Пример блокировки:

import asyncio
import time

async def blocking_task():
    print(f"[{time.time():.2f}] Blocking task started...")
    time.sleep(3)  # <-- Это блокирует Event Loop на 3 секунды
    print(f"[{time.time():.2f}] Blocking task finished.")

async def non_blocking_task():
    for i in range(3):
        print(f"[{time.time():.2f}] Non-blocking task running ({i+1}/3)")
        await asyncio.sleep(1) # Отдает управление Event Loop

async def main():
    print(f"[{time.time():.2f}] Main started.")
    await asyncio.gather(
        blocking_task(),
        non_blocking_task()
    )
    print(f"[{time.time():.2f}] Main finished.")

if __name__ == "__main__":
    asyncio.run(main())

В этом примере non_blocking_task не сможет выполняться параллельно с blocking_task, пока time.sleep(3) не завершится.

Решения для предотвращения блокировки:

  1. Использовать await для всех асинхронных операций: Убедитесь, что все I/O-операции и другие потенциально блокирующие вызовы используют await с соответствующими асинхронными библиотеками (например, aiohttp, aiofiles, asyncpg).
  2. Выносить блокирующий код в отдельные потоки/процессы: Для CPU-bound операций или использования синхронных библиотек, которые не имеют асинхронных аналогов, используйте loop.run_in_executor(). Это позволяет выполнять блокирующий код в отдельном пуле потоков (или процессов), не блокируя основной Event Loop.

    import asyncio
    import time
    import concurrent.futures # Для ThreadPoolExecutor
    
    def sync_blocking_function():
        print(f"[{time.time():.2f}] Sync blocking function started...")
        time.sleep(3)
        print(f"[{time.time():.2f}] Sync blocking function finished.")
        return "Result from sync function"
    
    async def async_wrapper():
        loop = asyncio.get_running_loop()
        # Выполняем блокирующую функцию в отдельном потоке
        result = await loop.run_in_executor(
            None, # Использует default ThreadPoolExecutor
            sync_blocking_function
        )
        print(f"[{time.time():.2f}] Async wrapper received: {result}")
    
    async def main_executor():
        print(f"[{time.time():.2f}] Main executor started.")
        await asyncio.gather(
            async_wrapper(),
            non_blocking_task() # Из предыдущего примера
        )
        print(f"[{time.time():.2f}] Main executor finished.")
    
    if __name__ == "__main__":
        asyncio.run(main_executor())

    В этом случае sync_blocking_function выполняется в отдельном потоке, и non_blocking_task может продолжать работу, не дожидаясь ее завершения.

Ответ 18+ 🔞

Слушай, а вот этот твой Event Loop в asyncio — он же как бабка у подъезда, которая всех видит и всем раздает поручения. А представь, что одна из этих асинхронных функций, корутина, значит, взяла и встала посреди этого круговорота, уперлась рогом и никуда не двигается. Весь этот движняк, блядь, встает колом! Это и есть блокировка, ёпта.

Из-за чего обычно вся движуха накрывается медным тазом?

  • Тяжелые вычисления, на которые процессор овердохуища времени тратит. Ну, типа, посчитать миллион цифр пи или JSON на три гигабайта распарсить в одном потоке. Ни одного await — вот и сидит цикл, как дурак, ждет.
  • Синхронный ввод-вывод, старый, блокирующий. Это когда вместо асинхронной библиотеки лезешь в файл обычным open() и читаешь, или в сеть стучишься через requests. Цикл событий, сука, так и замирает в ожидании.
  • Обычный time.sleep(). Да-да, вместо того чтобы вежливо сказать await asyncio.sleep(1) и отпустить управление, функция берет и засыпает насмерть, всех за собой таща. Хуй с горы, короче.

Смотри, как это выглядит в живую, прям пиздец наглядный:

import asyncio
import time

async def blocking_task():
    print(f"[{time.time():.2f}] Blocking task started...")
    time.sleep(3)  # <-- Вот эта падла! Три секунды всех тормозит!
    print(f"[{time.time():.2f}] Blocking task finished.")

async def non_blocking_task():
    for i in range(3):
        print(f"[{time.time():.2f}] Non-blocking task running ({i+1}/3)")
        await asyncio.sleep(1) # А вот этот молодец, управление отдает
    print(f"[{time.time():.2f}] Non-blocking task finished.")

async def main():
    print(f"[{time.time():.2f}] Main started.")
    await asyncio.gather(
        blocking_task(),
        non_blocking_task()
    )
    print(f"[{time.time():.2f}] Main finished.")

if __name__ == "__main__":
    asyncio.run(main())

Запустишь — и увидишь, что non_blocking_task простаивает, как идиот, все три секунды, пока time.sleep(3) не отмучается. Весь параллелизм, блядь, коту под хвост.

Что делать, чтобы не было мучительно больно?

  1. Везде, где можно, совать await. Сеть — aiohttp, файлы — aiofiles, базы — asyncpg. Не надо изобретать велосипед, ебать его в сраку.
  2. А если нельзя? Ну, legacy-код, библиотека старая, CPU-bound задача... Тогда, чувак, выноси это дерьмо в отдельный поток! Для этого есть loop.run_in_executor(). Кидаешь туда свою блокирующую функцию, и она варится в сторонке, не мешая основному циклу гонять другие задачи.

    import asyncio
    import time
    import concurrent.futures
    
    def sync_blocking_function():
        print(f"[{time.time():.2f}] Sync blocking function started...")
        time.sleep(3)
        print(f"[{time.time():.2f}] Sync blocking function finished.")
        return "Result from sync function"
    
    async def async_wrapper():
        loop = asyncio.get_running_loop()
        # Отправляем тяжёлую синхронную хуйню в отдельный поток
        result = await loop.run_in_executor(
            None, # Берет дефолтный ThreadPoolExecutor
            sync_blocking_function
        )
        print(f"[{time.time():.2f}] Async wrapper received: {result}")
    
    async def main_executor():
        print(f"[{time.time():.2f}] Main executor started.")
        await asyncio.gather(
            async_wrapper(),
            non_blocking_task() # Та самая неблокирующая задача
        )
        print(f"[{time.time():.2f}] Main executor finished.")
    
    if __name__ == "__main__":
        asyncio.run(main_executor())

    Вот теперь красота! sync_blocking_function пошла в отдельный поток, а non_blocking_task спокойно себе тикает каждую секунду в основном цикле. Никаких зависаний, всё летает. Главное — не путать, где асинхронно, а где нужно в поток вынести, а то получится каша, блядь, из топора.