Ответ
Для оптимизации синхронных HTTP-запросов, особенно при выполнении множества обращений к одному хосту, следует применять несколько ключевых техник.
1. Использование пула соединений (Connection Pooling)
Создание нового TCP-соединения для каждого запроса — дорогостоящая операция. requests.Session решает эту проблему, переиспользуя существующие соединения.
Почему это работает: Устраняются накладные расходы на TCP handshake и TLS negotiation для каждого последующего запроса к тому же хосту.
import requests
urls = [
'https://api.example.com/data/1',
'https://api.example.com/data/2',
'https://api.example.com/data/3'
]
# Создаем сессию один раз
with requests.Session() as session:
for url in urls:
# Соединение переиспользуется
response = session.get(url)
print(f"{url}: {response.status_code}")
2. Параллельное выполнение запросов с помощью потоков
Синхронные запросы по своей природе блокирующие. Пока один запрос ждет ответа от сети (I/O-bound операция), процессор простаивает. ThreadPoolExecutor позволяет выполнять несколько запросов одновременно в разных потоках.
import requests
from concurrent.futures import ThreadPoolExecutor
urls = ['https://example.com'] * 10
def fetch_url(url):
with requests.get(url) as response:
return response.status_code
# Выполняем до 5 запросов параллельно
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_url, urls))
print(results)
3. Установка разумных таймаутов Чтобы избежать "зависания" приложения из-за медленного ответа сервера, всегда указывайте таймауты.
connect: время на установку соединения.read: время на ожидание первого байта ответа.
# Ожидать соединения не более 3 секунд, ответа - не более 10 секунд
response = requests.get('https://slow-service.com', timeout=(3, 10))
4. Кэширование ответов
Если данные меняются редко, кэшируйте ответы, чтобы избежать повторных запросов. Для простых случаев подойдет functools.lru_cache.
from functools import lru_cache
import requests
@lru_cache(maxsize=128)
def get_user_data(user_id):
print(f"Fetching data for user {user_id}...")
return requests.get(f'https://api.example.com/users/{user_id}').json()
# Первый вызов выполнит HTTP-запрос
user1 = get_user_data(1)
# Второй вызов мгновенно вернет результат из кэша
user1_cached = get_user_data(1)
Для высоконагруженных систем, где узким местом является ожидание I/O, стоит рассмотреть переход на асинхронный подход с asyncio и библиотеками aiohttp или httpx.
Ответ 18+ 🔞
А, слушай, вот тут про оптимизацию HTTP-запросов пишут, ну такое, блядь. Типа, если ты каждый раз новый запрос делаешь, это как каждый раз заводить машину, чтобы до соседнего подъезда доехать — овердохуища бензина сожрёшь, а толку нихуя. Так что лови, как не быть мудаком.
1. Пул соединений — твой новый лучший друг
Вот представь: ты каждый раз новое соединение открываешь, это ж пиздец какой handshake, TLS, танцы с бубном. А можно один раз настроить и кататься как сыр в масле. В requests для этого есть Session. Он как умный чувак, который не закрывает дверь, если знает, что через минуту опять выходить.
import requests
urls = [
'https://api.example.com/data/1',
'https://api.example.com/data/2',
'https://api.example.com/data/3'
]
# Создаём сессию один раз и не выёбываемся
with requests.Session() as session:
for url in urls:
# А тут уже соединение переиспользуется, красота
response = session.get(url)
print(f"{url}: {response.status_code}")
2. Параллелим, блядь, как угорелые
Синхронные запросы — они ж по очереди ползут, как мартышлюшка по верёвочке. Пока один ждёт ответа, процессор тупо смотрит в потолок. Берём ThreadPoolExecutor и запускаем несколько сразу, как тараканов из банки.
import requests
from concurrent.futures import ThreadPoolExecutor
urls = ['https://example.com'] * 10
def fetch_url(url):
with requests.get(url) as response:
return response.status_code
# Запускаем до 5 потоков одновременно — и понеслась
with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_url, urls))
print(results)
3. Таймауты, а то сожрут тебя живьём Если сервер тупит, как пизда с ушами, твоё приложение может встать колом на вечность. Не надо так. Ставь ограничения по времени, чтоб знать, когда пора посылать всё нахуй.
# Ждём соединения не больше 3 секунд, ответа — не больше 10
response = requests.get('https://slow-service.com', timeout=(3, 10))
4. Кэширование — память золото, ёпта
Если данные меняются раз в год по обещанию, зачем каждый раз дергать сервер? Кэшируй, блядь! Для простоты — lru_cache, он как шпаргалка в рукаве.
from functools import lru_cache
import requests
@lru_cache(maxsize=128)
def get_user_data(user_id):
print(f"Fetching data for user {user_id}...")
return requests.get(f'https://api.example.com/users/{user_id}').json()
# Первый раз реально сходит на сервер
user1 = get_user_data(1)
# Второй раз вытащит из кэша, как из кармана — быстро и без нервов
user1_cached = get_user_data(1)
А если у тебя там запросов, как говна за баней, и всё упёрлось в ожидание ответов от сети — тогда, чувак, пора завязывать с этой синхронной хуйней и переходить на асинхронность. asyncio, aiohttp или httpx — вот где настоящая магия начинается, но это уже другая история, блядь.