Как вы работали с Airflow в Kubernetes?

«Как вы работали с Airflow в Kubernetes?» — вопрос из категории Apache Airflow, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Я разворачивал и поддерживал Airflow в продакшене на Kubernetes (EKS). Основная цель — добиться отказоустойчивости, масштабируемости и изоляции задач. Мы использовали связку Helm-чарт airflow (официальный от Apache) + KubernetesExecutor.

Архитектура и ключевые решения:

  • Executor: KubernetesExecutor. Каждая задача запускается в отдельном Pod в том же кластере. Это дает полную изоляцию зависимостей между DAG'ами.
  • База данных: Внешний PostgreSQL RDS для persistence метаданных.
  • Брокер: Не требуется для KubernetesExecutor (в отличие от Celery). Это упрощает архитектуру.
  • Хранилище DAG: DAG-файлы хранились в приватном Git-репозитории и синхронизировались в кластер через sidecar-контейнер git-sync, который монтировал код в общий emptyDir для шедулера и воркеров.

Пример DAG, использующего особенности K8s:

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from datetime import datetime

with DAG(
    dag_id='data_pipeline_k8s',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily'
) as dag:

    # Задача, запускающая Python-скрипт в специфичном образе
    process_task = KubernetesPodOperator(
        task_id='process_data',
        namespace='airflow-production',
        image='${AIRFLOW_IMAGE_REPO}/data-processor:latest', # Наш кастомный образ
        cmds=['python', '/scripts/process.py'],
        arguments=['--date', '{{ ds }}'],
        name='process-data-pod',
        # Подключаем секреты для доступа к БД
        secrets=[Secret('env', 'DB_PASSWORD', 'airflow-secrets', 'db-password')],
        # Гарантии ресурсов
        resources={'request_memory': '512Mi', 'request_cpu': '250m'},
        get_logs=True,
        is_delete_operator_pod=True, # Удалять Pod после выполнения
        dag=dag
    )

Решенные проблемы:

  • Масштабирование: Использовали Horizontal Pod Autoscaler (HPA) для веб-сервера и шедулера на основе CPU.
  • Логи: Настроили Fluentd sidecar в Pod'ах задач для отправки логов в центральный Loki, так как стандартные логи Airflow терялись после удаления Pod'а.
  • RBAC: Создали отдельный ServiceAccount для Airflow с минимально необходимыми правами на создание Pod'ов в namespace'е. Главный плюс — гибкость: для каждой задачи можно задать свой Docker-образ, ресурсы и секреты, не загрязняя общее окружение Airflow.