Какие инструменты ETL использовал (Apache Airflow, Apache Spark)?

Ответ

Основной стек для оркестрации и обработки данных включал Apache Airflow и Apache Spark.

Apache Airflow использовал для планирования, мониторинга и управления зависимостями ETL-пайплайнов. Писал DAG-и, используя различные операторы (PythonOperator, PostgresOperator, BigQueryExecuteQueryOperator).

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def transform_data():
    # Логика трансформации, например, с использованием Pandas
    import pandas as pd
    df = pd.read_csv('/tmp/raw_data.csv')
    df['processed_at'] = pd.Timestamp.now()
    df.to_parquet('/tmp/processed_data.parquet')

default_args = {
    'owner': 'data_team',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='daily_sales_etl',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1)
) as dag:
    transform_task = PythonOperator(
        task_id='transform_sales_data',
        python_callable=transform_data
    )

Apache Spark (PySpark) применял для распределённой обработки больших объёмов данных, когда входящие данные не помещались в память одной машины. Работал с DataFrame API, оптимизировал работу через правильное партиционирование и кэширование.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()

# Чтение данных
sales_df = spark.read.parquet("s3://bucket/raw_sales/")

# Трансформация и агрегация
daily_revenue = (sales_df
                 .groupBy("date")
                 .agg(sum("amount").alias("total_revenue"))
                 )

# Запись результата
daily_revenue.write.mode("overwrite").parquet("s3://bucket/processed/daily_revenue/")

Для более простых задач использовал Pandas в связке с SQLAlchemy для загрузки данных из БД и dbt для трансформации данных непосредственно в хранилище.

Ответ 18+ 🔞

А, слушай, вот это тема! Ну, значит, так. Основной наш стек, на котором всё держалось — это Apache Airflow и Apache Spark. Без них нихуя бы не получилось, честно говоря.

Apache Airflow — это, блядь, такая штука, которая всё планирует, смотрит за процессами и следит, чтобы один пайплайн в другой не влез без очереди. Я там накодил дохуища DAG-ов, используя всякие операторы — PythonOperator, PostgresOperator, этот, как его... BigQueryExecuteQueryOperator. В общем, ёперный театр, но удобный.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def transform_data():
    # Логика трансформации, например, с использованием Pandas
    import pandas as pd
    df = pd.read_csv('/tmp/raw_data.csv')
    df['processed_at'] = pd.Timestamp.now()
    df.to_parquet('/tmp/processed_data.parquet')

default_args = {
    'owner': 'data_team',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='daily_sales_etl',
    default_args=default_args,
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1)
) as dag:
    transform_task = PythonOperator(
        task_id='transform_sales_data',
        python_callable=transform_data
    )

А вот Apache Spark (PySpark) — это уже другая история, чувак. Это когда данные такие огромные, что на одной машине уже не влезают, и память говорит тебе «иди ты нахуй». Вот тут Spark вступает в игру. Работал через их DataFrame API, постоянно думал, как бы это всё правильно распартиционировать и закэшировать, чтобы не жрало ресурсы, как не в себя.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

spark = SparkSession.builder.appName("SalesAggregation").getOrCreate()

# Чтение данных
sales_df = spark.read.parquet("s3://bucket/raw_sales/")

# Трансформация и агрегация
daily_revenue = (sales_df
                 .groupBy("date")
                 .agg(sum("amount").alias("total_revenue"))
                 )

# Запись результата
daily_revenue.write.mode("overwrite").parquet("s3://bucket/processed/daily_revenue/")

Ну а для всякой мелкой, не такой уж и объёмной хуйни, использовал старый добрый Pandas. Подключал его к SQLAlchemy, чтобы из базы данные вытаскивать, и иногда юзал dbt, чтобы трансформировать данные прямо внутри хранилища — хитрая жопа, но иногда очень удобно. В общем, инструментов было овердохуища, главное — не запутаться и не накосячить.