Как в Python ограничить одновременный доступ к ресурсу N потоками

Ответ

Для ограничения одновременного доступа к общему ресурсу в многопоточной среде в Python используется примитив синхронизации threading.Semaphore.

Семафор поддерживает внутренний счетчик, который уменьшается при каждом вызове acquire() и увеличивается при вызове release(). Если счетчик равен нулю, поток, вызывающий acquire(), блокируется до тех пор, пока другой поток не вызовет release().

Пример с threading.Semaphore:

import threading
import time

class ResourceLimiter:
    def __init__(self, max_concurrent: int = 5):
        # Инициализируем семафор, разрешая одновременный доступ не более чем 5 потокам
        self.semaphore = threading.Semaphore(max_concurrent)

    def access_resource(self, thread_name: str):
        print(f"[{thread_name}] пытается получить доступ...")
        # Контекстный менеджер `with` автоматически вызывает acquire() в начале
        # и release() в конце блока, даже если возникнет исключение.
        with self.semaphore:
            print(f"[{thread_name}] получил доступ. Работа с ресурсом...")
            time.sleep(2) # Имитация работы
            print(f"[{thread_name}] освобождает ресурс.")

# Пример использования
limiter = ResourceLimiter(max_concurrent=3)
threads = []

# Запускаем 10 потоков, но только 3 смогут работать одновременно
for i in range(10):
    thread_name = f"Thread-{i+1}"
    t = threading.Thread(target=limiter.access_resource, args=(thread_name,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Альтернатива для асинхронного кода

В асинхронных приложениях (с использованием asyncio) следует использовать asyncio.Semaphore для тех же целей. Принцип работы аналогичен, но он не блокирует весь поток выполнения.

import asyncio

async def access_resource(semaphore, name):
    async with semaphore:
        print(f"[{name}] получил доступ...")
        await asyncio.sleep(1)
        print(f"[{name}] освобождает ресурс.")

async def main():
    semaphore = asyncio.Semaphore(3)
    tasks = [access_resource(semaphore, f"Task-{i}") for i in range(10)]
    await asyncio.gather(*tasks)

# asyncio.run(main())

Ответ 18+ 🔞

Да ты посмотри, что творится! Ну прям как в нашей общаге, когда на всех один чайник, а желающих — дохуя. Так вот, чтобы не устроить драку за кипяток, в Python есть штука под названием threading.Semaphore.

Представь себе, это такой счетчик-турникет, блядь. Внутри у него циферка. Каждый поток, который хочет пройти к общему ресурсу (нашему чайнику), должен сказать acquire() — и циферка уменьшается на единичку. Прошел — молодец. А когда отвалил, кричит release() — и циферка обратно подрастает. И если эта циферка упала в ноль, следующий желающий просто тупо ждет, пока кто-то не освободит место. Проще говоря, очередь, но без физической очереди, ёпта!

Вот смотри, как это выглядит в коде, тут всё честно:

import threading
import time

class ResourceLimiter:
    def __init__(self, max_concurrent: int = 5):
        # Делаем семафор. Допустим, к нашему "чайнику" могут подойти только 3 потока одновременно.
        # Больше — иди жди, пока кто-то не отпьёт, сука.
        self.semaphore = threading.Semaphore(max_concurrent)

    def access_resource(self, thread_name: str):
        print(f"[{thread_name}] пытается получить доступ...")
        # А вот это магия! Контекстный менеджер `with` сам всё сделает:
        # войдет — вызовет acquire(), выйдет — release(). Даже если посередине всё ебнется.
        with self.semaphore:
            print(f"[{thread_name}] получил доступ. Работа с ресурсом...")
            time.sleep(2) # Прикинься, что что-то делаешь, типа кипятишь воду
            print(f"[{thread_name}] освобождает ресурс.")

# Поехали!
limiter = ResourceLimiter(max_concurrent=3)
threads = []

# Запускаем аж 10 потоков-халявщиков, но семафор пропустит только троих за раз.
for i in range(10):
    thread_name = f"Thread-{i+1}"
    t = threading.Thread(target=limiter.access_resource, args=(thread_name,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

Выполнишь — увидишь, как первые три лезут, а остальные терпеливо, блядь, ждут своей очереди. Красота!

А если ты вдруг в асинхронном мире живешь, где все бегают по одной дорожке, но по очереди (asyncio), то там свой семафор — asyncio.Semaphore. Принцип тот же, только блокирует не поток, а корутину, пока другие работают.

import asyncio

async def access_resource(semaphore, name):
    async with semaphore:
        print(f"[{name}] получил доступ...")
        await asyncio.sleep(1)
        print(f"[{name}] освобождает ресурс.")

async def main():
    semaphore = asyncio.Semaphore(3)
    tasks = [access_resource(semaphore, f"Task-{i}") for i in range(10)]
    await asyncio.gather(*tasks)

# asyncio.run(main())

Вот и вся магия, ебать мои старые костыли. Никаких драк, всё цивильно. Главное — не забудь отпустить семафор, а то остальные так и будут ждать, а ты уже давно в сортире.