Как с помощью Apache Airflow пересчитать витрины данных?

«Как с помощью Apache Airflow пересчитать витрины данных?» — вопрос из категории Apache Airflow, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

В Airflow пересчет витрин данных организуется через DAG, который определяет порядок и зависимости между задачами обновления. Я обычно создаю отдельный DAG для полного пересчета (full refresh) и ежедневного инкрементального обновления.

Пример DAG для пересчета материализованных представлений в PostgreSQL:

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'data_engineer',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['team@example.com'],
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='data_marts_refresh',
    default_args=default_args,
    description='Полный пересчет витрин данных',
    schedule_interval=None,  # Запускается вручную
    start_date=days_ago(1),
    tags=['data_marts', 'refresh'],
    catchup=False
) as dag:

    # Очистка промежуточных таблиц
    cleanup_staging = PostgresOperator(
        task_id='cleanup_staging_tables',
        sql="""
        TRUNCATE TABLE staging.sales_data;
        TRUNCATE TABLE staging.user_activity;
        """,
        postgres_conn_id='postgres_dwh',
    )

    # Пересчет витрины продаж с CONCURRENTLY для минимизации блокировок
    refresh_sales_mart = PostgresOperator(
        task_id='refresh_sales_mart',
        sql="""
        REFRESH MATERIALIZED VIEW CONCURRENTLY mart_sales_daily;
        ANALYZE mart_sales_daily;
        """,
        postgres_conn_id='postgres_dwh',
    )

    # Пересчет витрины пользовательской активности
    refresh_user_mart = PostgresOperator(
        task_id='refresh_user_mart',
        sql="""
        REFRESH MATERIALIZED VIEW CONCURRENTLY mart_user_activity;
        ANALYZE mart_user_activity;
        """,
        postgres_conn_id='postgres_dwh',
    )

    # Зависимости: сначала очистка, затем параллельный пересчет витрин
    cleanup_staging >> [refresh_sales_mart, refresh_user_mart]

Ключевые практики, которые я применяю:

  1. Использование CONCURRENTLY для материализованных представлений PostgreSQL — позволяет читать данные во время обновления
  2. Правильное определение зависимостей между витринами через операторы >> и []
  3. Анализ статистики (ANALYZE) после пересчета для оптимизации запросов
  4. Мониторинг через Airflow UI и алертинг при сбоях
  5. Для сложных ETL использую PythonOperator с логикой на PySpark или Pandas, сохраняя промежуточные результаты в S3/HDFS