Ответ
Для ограничения одновременного доступа к общему ресурсу в многопоточной среде в Python используется примитив синхронизации threading.Semaphore.
Семафор поддерживает внутренний счетчик, который уменьшается при каждом вызове acquire() и увеличивается при вызове release(). Если счетчик равен нулю, поток, вызывающий acquire(), блокируется до тех пор, пока другой поток не вызовет release().
Пример с threading.Semaphore:
import threading
import time
class ResourceLimiter:
def __init__(self, max_concurrent: int = 5):
# Инициализируем семафор, разрешая одновременный доступ не более чем 5 потокам
self.semaphore = threading.Semaphore(max_concurrent)
def access_resource(self, thread_name: str):
print(f"[{thread_name}] пытается получить доступ...")
# Контекстный менеджер `with` автоматически вызывает acquire() в начале
# и release() в конце блока, даже если возникнет исключение.
with self.semaphore:
print(f"[{thread_name}] получил доступ. Работа с ресурсом...")
time.sleep(2) # Имитация работы
print(f"[{thread_name}] освобождает ресурс.")
# Пример использования
limiter = ResourceLimiter(max_concurrent=3)
threads = []
# Запускаем 10 потоков, но только 3 смогут работать одновременно
for i in range(10):
thread_name = f"Thread-{i+1}"
t = threading.Thread(target=limiter.access_resource, args=(thread_name,))
threads.append(t)
t.start()
for t in threads:
t.join()
Альтернатива для асинхронного кода
В асинхронных приложениях (с использованием asyncio) следует использовать asyncio.Semaphore для тех же целей. Принцип работы аналогичен, но он не блокирует весь поток выполнения.
import asyncio
async def access_resource(semaphore, name):
async with semaphore:
print(f"[{name}] получил доступ...")
await asyncio.sleep(1)
print(f"[{name}] освобождает ресурс.")
async def main():
semaphore = asyncio.Semaphore(3)
tasks = [access_resource(semaphore, f"Task-{i}") for i in range(10)]
await asyncio.gather(*tasks)
# asyncio.run(main())