Как оптимизировать синхронные HTTP-запросы в Python?

Ответ

Для оптимизации синхронных HTTP-запросов, особенно при выполнении множества обращений к одному хосту, следует применять несколько ключевых техник.

1. Использование пула соединений (Connection Pooling) Создание нового TCP-соединения для каждого запроса — дорогостоящая операция. requests.Session решает эту проблему, переиспользуя существующие соединения.

Почему это работает: Устраняются накладные расходы на TCP handshake и TLS negotiation для каждого последующего запроса к тому же хосту.

import requests

urls = [
    'https://api.example.com/data/1',
    'https://api.example.com/data/2',
    'https://api.example.com/data/3'
]

# Создаем сессию один раз
with requests.Session() as session:
    for url in urls:
        # Соединение переиспользуется
        response = session.get(url)
        print(f"{url}: {response.status_code}")

2. Параллельное выполнение запросов с помощью потоков Синхронные запросы по своей природе блокирующие. Пока один запрос ждет ответа от сети (I/O-bound операция), процессор простаивает. ThreadPoolExecutor позволяет выполнять несколько запросов одновременно в разных потоках.

import requests
from concurrent.futures import ThreadPoolExecutor

urls = ['https://example.com'] * 10

def fetch_url(url):
    with requests.get(url) as response:
        return response.status_code

# Выполняем до 5 запросов параллельно
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(fetch_url, urls))

print(results)

3. Установка разумных таймаутов Чтобы избежать "зависания" приложения из-за медленного ответа сервера, всегда указывайте таймауты.

  • connect: время на установку соединения.
  • read: время на ожидание первого байта ответа.
# Ожидать соединения не более 3 секунд, ответа - не более 10 секунд
response = requests.get('https://slow-service.com', timeout=(3, 10))

4. Кэширование ответов Если данные меняются редко, кэшируйте ответы, чтобы избежать повторных запросов. Для простых случаев подойдет functools.lru_cache.

from functools import lru_cache
import requests

@lru_cache(maxsize=128)
def get_user_data(user_id):
    print(f"Fetching data for user {user_id}...")
    return requests.get(f'https://api.example.com/users/{user_id}').json()

# Первый вызов выполнит HTTP-запрос
user1 = get_user_data(1)
# Второй вызов мгновенно вернет результат из кэша
user1_cached = get_user_data(1)

Для высоконагруженных систем, где узким местом является ожидание I/O, стоит рассмотреть переход на асинхронный подход с asyncio и библиотеками aiohttp или httpx.

Ответ 18+ 🔞

А, слушай, вот тут про оптимизацию HTTP-запросов пишут, ну такое, блядь. Типа, если ты каждый раз новый запрос делаешь, это как каждый раз заводить машину, чтобы до соседнего подъезда доехать — овердохуища бензина сожрёшь, а толку нихуя. Так что лови, как не быть мудаком.

1. Пул соединений — твой новый лучший друг Вот представь: ты каждый раз новое соединение открываешь, это ж пиздец какой handshake, TLS, танцы с бубном. А можно один раз настроить и кататься как сыр в масле. В requests для этого есть Session. Он как умный чувак, который не закрывает дверь, если знает, что через минуту опять выходить.

import requests

urls = [
    'https://api.example.com/data/1',
    'https://api.example.com/data/2',
    'https://api.example.com/data/3'
]

# Создаём сессию один раз и не выёбываемся
with requests.Session() as session:
    for url in urls:
        # А тут уже соединение переиспользуется, красота
        response = session.get(url)
        print(f"{url}: {response.status_code}")

2. Параллелим, блядь, как угорелые Синхронные запросы — они ж по очереди ползут, как мартышлюшка по верёвочке. Пока один ждёт ответа, процессор тупо смотрит в потолок. Берём ThreadPoolExecutor и запускаем несколько сразу, как тараканов из банки.

import requests
from concurrent.futures import ThreadPoolExecutor

urls = ['https://example.com'] * 10

def fetch_url(url):
    with requests.get(url) as response:
        return response.status_code

# Запускаем до 5 потоков одновременно — и понеслась
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(fetch_url, urls))

print(results)

3. Таймауты, а то сожрут тебя живьём Если сервер тупит, как пизда с ушами, твоё приложение может встать колом на вечность. Не надо так. Ставь ограничения по времени, чтоб знать, когда пора посылать всё нахуй.

# Ждём соединения не больше 3 секунд, ответа — не больше 10
response = requests.get('https://slow-service.com', timeout=(3, 10))

4. Кэширование — память золото, ёпта Если данные меняются раз в год по обещанию, зачем каждый раз дергать сервер? Кэшируй, блядь! Для простоты — lru_cache, он как шпаргалка в рукаве.

from functools import lru_cache
import requests

@lru_cache(maxsize=128)
def get_user_data(user_id):
    print(f"Fetching data for user {user_id}...")
    return requests.get(f'https://api.example.com/users/{user_id}').json()

# Первый раз реально сходит на сервер
user1 = get_user_data(1)
# Второй раз вытащит из кэша, как из кармана — быстро и без нервов
user1_cached = get_user_data(1)

А если у тебя там запросов, как говна за баней, и всё упёрлось в ожидание ответов от сети — тогда, чувак, пора завязывать с этой синхронной хуйней и переходить на асинхронность. asyncio, aiohttp или httpx — вот где настоящая магия начинается, но это уже другая история, блядь.