Ответ
В контексте разработки ETL-пайплайнов, аналитики данных и создания сервисов для их обработки я активно работал со следующим стеком:
Для обработки и анализа данных:
- Pandas: Для прототипирования, исследования данных и задач, где данные помещаются в память одной машины. Незаменим для быстрой агрегации, очистки и трансформации.
- PySpark: Для промышленной обработки больших объемов данных, распределенных по кластеру. Использовал для сложных ETL-задач, где нужно было объединять терабайты логов из разных источников.
Для создания API и backend-сервисов:
- FastAPI: Для разработки высокопроизводительных REST API, которые предоставляли доступ к результатам агрегаций или моделям машинного обучения. Его асинхронность и автоматическая генерация OpenAPI-документации сильно ускоряли разработку.
- SQLAlchemy (Core + ORM): В качестве основного инструмента для работы с реляционными БД (в основном PostgreSQL) из Python-приложений. Core использовал для сложных запросов в ETL, ORM — в веб-сервисах.
Для оркестрации и планирования задач:
- Apache Airflow: Для создания, мониторинга и управления графами зависимостей ETL-задач (DAG). Позволял выстраивать сложные пайплайны с перезапуском при сбоях, alerting'ом и логированием.
Пример типичного пайплайна (Airflow DAG + PySpark):
# Упрощенный пример задачи в Airflow DAG
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime
default_args = {'owner': 'data_team'}
dag = DAG('daily_etl', default_args=default_args, schedule_interval='@daily')
# Задача, запускающая PySpark-джоб
process_events = SparkSubmitOperator(
task_id='process_user_events',
application='/jobs/process_events.py', # Основная логика на PySpark
conn_id='spark_default',
dag=dag
)
Код внутри process_events.py (упрощенно):
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
spark = SparkSession.builder.appName("DailyETL").getOrCreate()
# Чтение сырых данных из HDFS (в формате Parquet)
raw_df = spark.read.parquet("/data-lake/raw/events/*")
# Очистка, фильтрация, агрегация
processed_df = (raw_df
.filter(col("event_time") > "2024-01-01")
.groupBy("user_id", "date")
.agg(sum("session_duration").alias("total_duration"))
)
# Запись результата для аналитиков
processed_df.write.mode("overwrite").parquet("/data-lake/aggregated/daily_sessions/")
Такой стек позволяет покрывать весь цикл работы с данными: от сырых логов до готовых агрегатов и API для их потребления.