Ответ
В Airflow пересчет витрин данных организуется через DAG, который определяет порядок и зависимости между задачами обновления. Я обычно создаю отдельный DAG для полного пересчета (full refresh) и ежедневного инкрементального обновления.
Пример DAG для пересчета материализованных представлений в PostgreSQL:
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago
default_args = {
'owner': 'data_engineer',
'depends_on_past': False,
'email_on_failure': True,
'email': ['team@example.com'],
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='data_marts_refresh',
default_args=default_args,
description='Полный пересчет витрин данных',
schedule_interval=None, # Запускается вручную
start_date=days_ago(1),
tags=['data_marts', 'refresh'],
catchup=False
) as dag:
# Очистка промежуточных таблиц
cleanup_staging = PostgresOperator(
task_id='cleanup_staging_tables',
sql="""
TRUNCATE TABLE staging.sales_data;
TRUNCATE TABLE staging.user_activity;
""",
postgres_conn_id='postgres_dwh',
)
# Пересчет витрины продаж с CONCURRENTLY для минимизации блокировок
refresh_sales_mart = PostgresOperator(
task_id='refresh_sales_mart',
sql="""
REFRESH MATERIALIZED VIEW CONCURRENTLY mart_sales_daily;
ANALYZE mart_sales_daily;
""",
postgres_conn_id='postgres_dwh',
)
# Пересчет витрины пользовательской активности
refresh_user_mart = PostgresOperator(
task_id='refresh_user_mart',
sql="""
REFRESH MATERIALIZED VIEW CONCURRENTLY mart_user_activity;
ANALYZE mart_user_activity;
""",
postgres_conn_id='postgres_dwh',
)
# Зависимости: сначала очистка, затем параллельный пересчет витрин
cleanup_staging >> [refresh_sales_mart, refresh_user_mart]
Ключевые практики, которые я применяю:
- Использование
CONCURRENTLYдля материализованных представлений PostgreSQL — позволяет читать данные во время обновления - Правильное определение зависимостей между витринами через операторы
>>и[] - Анализ статистики (
ANALYZE) после пересчета для оптимизации запросов - Мониторинг через Airflow UI и алертинг при сбоях
- Для сложных ETL использую
PythonOperatorс логикой на PySpark или Pandas, сохраняя промежуточные результаты в S3/HDFS