Какие Python-фреймворки ты использовал?

«Какие Python-фреймворки ты использовал?» — вопрос из категории Python, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

В контексте разработки ETL-пайплайнов, аналитики данных и создания сервисов для их обработки я активно работал со следующим стеком:

Для обработки и анализа данных:

  • Pandas: Для прототипирования, исследования данных и задач, где данные помещаются в память одной машины. Незаменим для быстрой агрегации, очистки и трансформации.
  • PySpark: Для промышленной обработки больших объемов данных, распределенных по кластеру. Использовал для сложных ETL-задач, где нужно было объединять терабайты логов из разных источников.

Для создания API и backend-сервисов:

  • FastAPI: Для разработки высокопроизводительных REST API, которые предоставляли доступ к результатам агрегаций или моделям машинного обучения. Его асинхронность и автоматическая генерация OpenAPI-документации сильно ускоряли разработку.
  • SQLAlchemy (Core + ORM): В качестве основного инструмента для работы с реляционными БД (в основном PostgreSQL) из Python-приложений. Core использовал для сложных запросов в ETL, ORM — в веб-сервисах.

Для оркестрации и планирования задач:

  • Apache Airflow: Для создания, мониторинга и управления графами зависимостей ETL-задач (DAG). Позволял выстраивать сложные пайплайны с перезапуском при сбоях, alerting'ом и логированием.

Пример типичного пайплайна (Airflow DAG + PySpark):

# Упрощенный пример задачи в Airflow DAG
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime

default_args = {'owner': 'data_team'}

dag = DAG('daily_etl', default_args=default_args, schedule_interval='@daily')

# Задача, запускающая PySpark-джоб
process_events = SparkSubmitOperator(
    task_id='process_user_events',
    application='/jobs/process_events.py', # Основная логика на PySpark
    conn_id='spark_default',
    dag=dag
)

Код внутри process_events.py (упрощенно):

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = SparkSession.builder.appName("DailyETL").getOrCreate()

# Чтение сырых данных из HDFS (в формате Parquet)
raw_df = spark.read.parquet("/data-lake/raw/events/*")

# Очистка, фильтрация, агрегация
processed_df = (raw_df
    .filter(col("event_time") > "2024-01-01")
    .groupBy("user_id", "date")
    .agg(sum("session_duration").alias("total_duration"))
)

# Запись результата для аналитиков
processed_df.write.mode("overwrite").parquet("/data-lake/aggregated/daily_sessions/")

Такой стек позволяет покрывать весь цикл работы с данными: от сырых логов до готовых агрегатов и API для их потребления.