Как выполнить JOIN-запрос в SQLAlchemy?

Ответ

В SQLAlchemy JOIN-запросы можно выполнять двумя основными способами: через ORM (объектно-реляционное отображение) и через Core (прямое построение SQL-выражений).

1. ORM-подход

Это наиболее распространенный способ при работе с моделями данных. SQLAlchemy автоматически определяет условие соединения, если между моделями настроены отношения (relationship).

Предположим, есть модели User и Address со связью User.addresses.

INNER JOIN

Используется метод .join(). Он соединяет таблицы по настроенному отношению.

from sqlalchemy.orm import sessionmaker

# Найти всех пользователей, у которых есть адрес в Калифорнии
query = session.query(User).join(Address).filter(Address.state == 'CA')
california_users = query.all()

LEFT OUTER JOIN

Используется метод .outerjoin() или join(..., isouter=True). Полезен, когда нужно получить все записи из левой таблицы, даже если для них нет совпадений в правой.

# Найти всех пользователей и их адреса (включая пользователей без адресов)
query = session.query(User, Address).outerjoin(Address)
all_users_with_addresses = query.all()

Жадная загрузка (Eager Loading)

Чтобы избежать проблемы N+1 запросов при доступе к связанным объектам, используйте joinedload.

from sqlalchemy.orm import joinedload

# Загрузить всех пользователей и сразу же их адреса одним запросом
users = session.query(User).options(joinedload(User.addresses)).all()

# Доступ к user.addresses не вызовет дополнительных запросов к БД
for user in users:
    for address in user.addresses:
        print(user.name, address.email_address)

2. Core-подход

Этот подход ближе к написанию чистого SQL и используется для более сложных или оптимизированных запросов, работая напрямую с объектами Table.

from sqlalchemy import select, text

# users и addresses - это объекты Table
stmt = select(users.c.name, addresses.c.email_address)
    .select_from(users.join(addresses, users.c.id == addresses.c.user_id))
    .where(users.c.name.like('j%'))

with engine.connect() as connection:
    result = connection.execute(stmt)
    for row in result:
        print(row)