Ответ
Основной стек для оркестрации и обработки данных включал Apache Airflow и Apache Spark.
Apache Airflow использовал для планирования, мониторинга и управления зависимостями ETL-пайплайнов. Писал DAG-и, используя различные операторы (PythonOperator, PostgresOperator, BigQueryExecuteQueryOperator).
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def transform_data():
# Логика трансформации, например, с использованием Pandas
import pandas as pd
df = pd.read_csv('/tmp/raw_data.csv')
df['processed_at'] = pd.Timestamp.now()
df.to_parquet('/tmp/processed_data.parquet')
default_args = {
'owner': 'data_team',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='daily_sales_etl',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2023, 1, 1)
) as dag:
transform_task = PythonOperator(
task_id='transform_sales_data',
python_callable=transform_data
)
Apache Spark (PySpark) применял для распределённой обработки больших объёмов данных, когда входящие данные не помещались в память одной машины. Работал с DataFrame API, оптимизировал работу через правильное партиционирование и кэширование.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()
# Чтение данных
sales_df = spark.read.parquet("s3://bucket/raw_sales/")
# Трансформация и агрегация
daily_revenue = (sales_df
.groupBy("date")
.agg(sum("amount").alias("total_revenue"))
)
# Запись результата
daily_revenue.write.mode("overwrite").parquet("s3://bucket/processed/daily_revenue/")
Для более простых задач использовал Pandas в связке с SQLAlchemy для загрузки данных из БД и dbt для трансформации данных непосредственно в хранилище.
Ответ 18+ 🔞
А, слушай, вот это тема! Ну, значит, так. Основной наш стек, на котором всё держалось — это Apache Airflow и Apache Spark. Без них нихуя бы не получилось, честно говоря.
Apache Airflow — это, блядь, такая штука, которая всё планирует, смотрит за процессами и следит, чтобы один пайплайн в другой не влез без очереди. Я там накодил дохуища DAG-ов, используя всякие операторы — PythonOperator, PostgresOperator, этот, как его... BigQueryExecuteQueryOperator. В общем, ёперный театр, но удобный.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def transform_data():
# Логика трансформации, например, с использованием Pandas
import pandas as pd
df = pd.read_csv('/tmp/raw_data.csv')
df['processed_at'] = pd.Timestamp.now()
df.to_parquet('/tmp/processed_data.parquet')
default_args = {
'owner': 'data_team',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='daily_sales_etl',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2023, 1, 1)
) as dag:
transform_task = PythonOperator(
task_id='transform_sales_data',
python_callable=transform_data
)
А вот Apache Spark (PySpark) — это уже другая история, чувак. Это когда данные такие огромные, что на одной машине уже не влезают, и память говорит тебе «иди ты нахуй». Вот тут Spark вступает в игру. Работал через их DataFrame API, постоянно думал, как бы это всё правильно распартиционировать и закэшировать, чтобы не жрало ресурсы, как не в себя.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()
# Чтение данных
sales_df = spark.read.parquet("s3://bucket/raw_sales/")
# Трансформация и агрегация
daily_revenue = (sales_df
.groupBy("date")
.agg(sum("amount").alias("total_revenue"))
)
# Запись результата
daily_revenue.write.mode("overwrite").parquet("s3://bucket/processed/daily_revenue/")
Ну а для всякой мелкой, не такой уж и объёмной хуйни, использовал старый добрый Pandas. Подключал его к SQLAlchemy, чтобы из базы данные вытаскивать, и иногда юзал dbt, чтобы трансформировать данные прямо внутри хранилища — хитрая жопа, но иногда очень удобно. В общем, инструментов было овердохуища, главное — не запутаться и не накосячить.