Как в Python ограничить одновременный доступ к ресурсу N потоками

«Как в Python ограничить одновременный доступ к ресурсу N потоками» — вопрос из категории Архитектура, который задают на 10% собеседований Python Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Для ограничения одновременного доступа к общему ресурсу в многопоточной среде в Python используется примитив синхронизации threading.Semaphore.

Семафор поддерживает внутренний счетчик, который уменьшается при каждом вызове acquire() и увеличивается при вызове release(). Если счетчик равен нулю, поток, вызывающий acquire(), блокируется до тех пор, пока другой поток не вызовет release().

Пример с threading.Semaphore:

import threading
import time

class ResourceLimiter:
    def __init__(self, max_concurrent: int = 5):
        # Инициализируем семафор, разрешая одновременный доступ не более чем 5 потокам
        self.semaphore = threading.Semaphore(max_concurrent)

    def access_resource(self, thread_name: str):
        print(f"[{thread_name}] пытается получить доступ...")
        # Контекстный менеджер `with` автоматически вызывает acquire() в начале
        # и release() в конце блока, даже если возникнет исключение.
        with self.semaphore:
            print(f"[{thread_name}] получил доступ. Работа с ресурсом...")
            time.sleep(2) # Имитация работы
            print(f"[{thread_name}] освобождает ресурс.")

# Пример использования
limiter = ResourceLimiter(max_concurrent=3)
threads = []

# Запускаем 10 потоков, но только 3 смогут работать одновременно
for i in range(10):
    thread_name = f"Thread-{i+1}"
    t = threading.Thread(target=limiter.access_resource, args=(thread_name,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Альтернатива для асинхронного кода

В асинхронных приложениях (с использованием asyncio) следует использовать asyncio.Semaphore для тех же целей. Принцип работы аналогичен, но он не блокирует весь поток выполнения.

import asyncio

async def access_resource(semaphore, name):
    async with semaphore:
        print(f"[{name}] получил доступ...")
        await asyncio.sleep(1)
        print(f"[{name}] освобождает ресурс.")

async def main():
    semaphore = asyncio.Semaphore(3)
    tasks = [access_resource(semaphore, f"Task-{i}") for i in range(10)]
    await asyncio.gather(*tasks)

# asyncio.run(main())