Ответ
Для ограничения одновременного доступа к общему ресурсу в многопоточной среде в Python используется примитив синхронизации threading.Semaphore.
Семафор поддерживает внутренний счетчик, который уменьшается при каждом вызове acquire() и увеличивается при вызове release(). Если счетчик равен нулю, поток, вызывающий acquire(), блокируется до тех пор, пока другой поток не вызовет release().
Пример с threading.Semaphore:
import threading
import time
class ResourceLimiter:
def __init__(self, max_concurrent: int = 5):
# Инициализируем семафор, разрешая одновременный доступ не более чем 5 потокам
self.semaphore = threading.Semaphore(max_concurrent)
def access_resource(self, thread_name: str):
print(f"[{thread_name}] пытается получить доступ...")
# Контекстный менеджер `with` автоматически вызывает acquire() в начале
# и release() в конце блока, даже если возникнет исключение.
with self.semaphore:
print(f"[{thread_name}] получил доступ. Работа с ресурсом...")
time.sleep(2) # Имитация работы
print(f"[{thread_name}] освобождает ресурс.")
# Пример использования
limiter = ResourceLimiter(max_concurrent=3)
threads = []
# Запускаем 10 потоков, но только 3 смогут работать одновременно
for i in range(10):
thread_name = f"Thread-{i+1}"
t = threading.Thread(target=limiter.access_resource, args=(thread_name,))
threads.append(t)
t.start()
for t in threads:
t.join()
Альтернатива для асинхронного кода
В асинхронных приложениях (с использованием asyncio) следует использовать asyncio.Semaphore для тех же целей. Принцип работы аналогичен, но он не блокирует весь поток выполнения.
import asyncio
async def access_resource(semaphore, name):
async with semaphore:
print(f"[{name}] получил доступ...")
await asyncio.sleep(1)
print(f"[{name}] освобождает ресурс.")
async def main():
semaphore = asyncio.Semaphore(3)
tasks = [access_resource(semaphore, f"Task-{i}") for i in range(10)]
await asyncio.gather(*tasks)
# asyncio.run(main()) Ответ 18+ 🔞
Да ты посмотри, что творится! Ну прям как в нашей общаге, когда на всех один чайник, а желающих — дохуя. Так вот, чтобы не устроить драку за кипяток, в Python есть штука под названием threading.Semaphore.
Представь себе, это такой счетчик-турникет, блядь. Внутри у него циферка. Каждый поток, который хочет пройти к общему ресурсу (нашему чайнику), должен сказать acquire() — и циферка уменьшается на единичку. Прошел — молодец. А когда отвалил, кричит release() — и циферка обратно подрастает. И если эта циферка упала в ноль, следующий желающий просто тупо ждет, пока кто-то не освободит место. Проще говоря, очередь, но без физической очереди, ёпта!
Вот смотри, как это выглядит в коде, тут всё честно:
import threading
import time
class ResourceLimiter:
def __init__(self, max_concurrent: int = 5):
# Делаем семафор. Допустим, к нашему "чайнику" могут подойти только 3 потока одновременно.
# Больше — иди жди, пока кто-то не отпьёт, сука.
self.semaphore = threading.Semaphore(max_concurrent)
def access_resource(self, thread_name: str):
print(f"[{thread_name}] пытается получить доступ...")
# А вот это магия! Контекстный менеджер `with` сам всё сделает:
# войдет — вызовет acquire(), выйдет — release(). Даже если посередине всё ебнется.
with self.semaphore:
print(f"[{thread_name}] получил доступ. Работа с ресурсом...")
time.sleep(2) # Прикинься, что что-то делаешь, типа кипятишь воду
print(f"[{thread_name}] освобождает ресурс.")
# Поехали!
limiter = ResourceLimiter(max_concurrent=3)
threads = []
# Запускаем аж 10 потоков-халявщиков, но семафор пропустит только троих за раз.
for i in range(10):
thread_name = f"Thread-{i+1}"
t = threading.Thread(target=limiter.access_resource, args=(thread_name,))
threads.append(t)
t.start()
for t in threads:
t.join()
Выполнишь — увидишь, как первые три лезут, а остальные терпеливо, блядь, ждут своей очереди. Красота!
А если ты вдруг в асинхронном мире живешь, где все бегают по одной дорожке, но по очереди (asyncio), то там свой семафор — asyncio.Semaphore. Принцип тот же, только блокирует не поток, а корутину, пока другие работают.
import asyncio
async def access_resource(semaphore, name):
async with semaphore:
print(f"[{name}] получил доступ...")
await asyncio.sleep(1)
print(f"[{name}] освобождает ресурс.")
async def main():
semaphore = asyncio.Semaphore(3)
tasks = [access_resource(semaphore, f"Task-{i}") for i in range(10)]
await asyncio.gather(*tasks)
# asyncio.run(main())
Вот и вся магия, ебать мои старые костыли. Никаких драк, всё цивильно. Главное — не забудь отпустить семафор, а то остальные так и будут ждать, а ты уже давно в сортире.