Ответ
Event Loop в asyncio может быть заблокирован, если внутри асинхронной функции (корутины) выполняется синхронный (блокирующий) код, который не отдает управление обратно в цикл событий. Это приводит к тому, что весь Event Loop перестает обрабатывать другие задачи, вызывая "зависание" приложения.
Основные причины блокировки Event Loop:
- Длительные CPU-bound операции: Вычисления, которые занимают много процессорного времени без использования
awaitили передачи управления. Например, сложные математические расчеты, обработка больших объемов данных в одном потоке. - Синхронные I/O операции: Использование стандартных блокирующих функций ввода/вывода, таких как:
- Чтение/запись файлов без асинхронных библиотек (например,
open()вместоaiofiles). - Сетевые запросы с использованием блокирующих библиотек (например,
requestsвместоaiohttp). - Доступ к базам данных с использованием синхронных драйверов.
- Чтение/запись файлов без асинхронных библиотек (например,
- Использование
time.sleep(): Вместо асинхронногоawait asyncio.sleep(), который корректно отдает управление Event Loop.
Пример блокировки:
import asyncio
import time
async def blocking_task():
print(f"[{time.time():.2f}] Blocking task started...")
time.sleep(3) # <-- Это блокирует Event Loop на 3 секунды
print(f"[{time.time():.2f}] Blocking task finished.")
async def non_blocking_task():
for i in range(3):
print(f"[{time.time():.2f}] Non-blocking task running ({i+1}/3)")
await asyncio.sleep(1) # Отдает управление Event Loop
async def main():
print(f"[{time.time():.2f}] Main started.")
await asyncio.gather(
blocking_task(),
non_blocking_task()
)
print(f"[{time.time():.2f}] Main finished.")
if __name__ == "__main__":
asyncio.run(main())
В этом примере non_blocking_task не сможет выполняться параллельно с blocking_task, пока time.sleep(3) не завершится.
Решения для предотвращения блокировки:
- Использовать
awaitдля всех асинхронных операций: Убедитесь, что все I/O-операции и другие потенциально блокирующие вызовы используютawaitс соответствующими асинхронными библиотеками (например,aiohttp,aiofiles,asyncpg). -
Выносить блокирующий код в отдельные потоки/процессы: Для CPU-bound операций или использования синхронных библиотек, которые не имеют асинхронных аналогов, используйте
loop.run_in_executor(). Это позволяет выполнять блокирующий код в отдельном пуле потоков (или процессов), не блокируя основной Event Loop.import asyncio import time import concurrent.futures # Для ThreadPoolExecutor def sync_blocking_function(): print(f"[{time.time():.2f}] Sync blocking function started...") time.sleep(3) print(f"[{time.time():.2f}] Sync blocking function finished.") return "Result from sync function" async def async_wrapper(): loop = asyncio.get_running_loop() # Выполняем блокирующую функцию в отдельном потоке result = await loop.run_in_executor( None, # Использует default ThreadPoolExecutor sync_blocking_function ) print(f"[{time.time():.2f}] Async wrapper received: {result}") async def main_executor(): print(f"[{time.time():.2f}] Main executor started.") await asyncio.gather( async_wrapper(), non_blocking_task() # Из предыдущего примера ) print(f"[{time.time():.2f}] Main executor finished.") if __name__ == "__main__": asyncio.run(main_executor())В этом случае
sync_blocking_functionвыполняется в отдельном потоке, иnon_blocking_taskможет продолжать работу, не дожидаясь ее завершения.
Ответ 18+ 🔞
Слушай, а вот этот твой Event Loop в asyncio — он же как бабка у подъезда, которая всех видит и всем раздает поручения. А представь, что одна из этих асинхронных функций, корутина, значит, взяла и встала посреди этого круговорота, уперлась рогом и никуда не двигается. Весь этот движняк, блядь, встает колом! Это и есть блокировка, ёпта.
Из-за чего обычно вся движуха накрывается медным тазом?
- Тяжелые вычисления, на которые процессор овердохуища времени тратит. Ну, типа, посчитать миллион цифр пи или JSON на три гигабайта распарсить в одном потоке. Ни одного
await— вот и сидит цикл, как дурак, ждет. - Синхронный ввод-вывод, старый, блокирующий. Это когда вместо асинхронной библиотеки лезешь в файл обычным
open()и читаешь, или в сеть стучишься черезrequests. Цикл событий, сука, так и замирает в ожидании. - Обычный
time.sleep(). Да-да, вместо того чтобы вежливо сказатьawait asyncio.sleep(1)и отпустить управление, функция берет и засыпает насмерть, всех за собой таща. Хуй с горы, короче.
Смотри, как это выглядит в живую, прям пиздец наглядный:
import asyncio
import time
async def blocking_task():
print(f"[{time.time():.2f}] Blocking task started...")
time.sleep(3) # <-- Вот эта падла! Три секунды всех тормозит!
print(f"[{time.time():.2f}] Blocking task finished.")
async def non_blocking_task():
for i in range(3):
print(f"[{time.time():.2f}] Non-blocking task running ({i+1}/3)")
await asyncio.sleep(1) # А вот этот молодец, управление отдает
print(f"[{time.time():.2f}] Non-blocking task finished.")
async def main():
print(f"[{time.time():.2f}] Main started.")
await asyncio.gather(
blocking_task(),
non_blocking_task()
)
print(f"[{time.time():.2f}] Main finished.")
if __name__ == "__main__":
asyncio.run(main())
Запустишь — и увидишь, что non_blocking_task простаивает, как идиот, все три секунды, пока time.sleep(3) не отмучается. Весь параллелизм, блядь, коту под хвост.
Что делать, чтобы не было мучительно больно?
- Везде, где можно, совать
await. Сеть —aiohttp, файлы —aiofiles, базы —asyncpg. Не надо изобретать велосипед, ебать его в сраку. -
А если нельзя? Ну, legacy-код, библиотека старая, CPU-bound задача... Тогда, чувак, выноси это дерьмо в отдельный поток! Для этого есть
loop.run_in_executor(). Кидаешь туда свою блокирующую функцию, и она варится в сторонке, не мешая основному циклу гонять другие задачи.import asyncio import time import concurrent.futures def sync_blocking_function(): print(f"[{time.time():.2f}] Sync blocking function started...") time.sleep(3) print(f"[{time.time():.2f}] Sync blocking function finished.") return "Result from sync function" async def async_wrapper(): loop = asyncio.get_running_loop() # Отправляем тяжёлую синхронную хуйню в отдельный поток result = await loop.run_in_executor( None, # Берет дефолтный ThreadPoolExecutor sync_blocking_function ) print(f"[{time.time():.2f}] Async wrapper received: {result}") async def main_executor(): print(f"[{time.time():.2f}] Main executor started.") await asyncio.gather( async_wrapper(), non_blocking_task() # Та самая неблокирующая задача ) print(f"[{time.time():.2f}] Main executor finished.") if __name__ == "__main__": asyncio.run(main_executor())Вот теперь красота!
sync_blocking_functionпошла в отдельный поток, аnon_blocking_taskспокойно себе тикает каждую секунду в основном цикле. Никаких зависаний, всё летает. Главное — не путать, где асинхронно, а где нужно в поток вынести, а то получится каша, блядь, из топора.