Приходилось ли использовать Apache Airflow API? Если да, для каких задач?

«Приходилось ли использовать Apache Airflow API? Если да, для каких задач?» — вопрос из категории Apache Airflow, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Да, активно использовал Airflow REST API для автоматизации и интеграции с внешними системами.

Основные сценарии применения:

  1. Оркестрация пайплайнов извне:

    • Запуск DAG (POST /api/v1/dags/{dag_id}/dagRuns) по событию из внешней системы. Например, после успешной загрузки нового файла в S3 наша файловая система отправляла webhook, который триггерил запуск соответствующего DAG через API.
      
      # Пример кода для запуска DAG с конфигурацией
      import requests

    response = requests.post( 'http://airflow.example.com/api/v1/dags/process_s3_file/dagRuns', headers={'Authorization': 'Bearer ...', 'Content-Type': 'application/json'}, json={ "conf": {"s3_key": "data/2023-10-01.csv"}, "dag_run_id": "manual__2023-10-01T12:00:00" } )

  2. Мониторинг и алертинг:

    • Получение статуса выполнения DAG Run (GET /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}) для интеграции в корпоративные дашборды (Grafana) или отправки кастомных уведомлений в Slack/Telegram при сбоях.
    • Получение списка "зависших" задач (запущенных слишком долго) для автоматического реагирования.
  3. Динамическое управление DAG:

    • В одном из проектов мы использовали API для паузы/возобновления (PATCH /api/v1/dags/{dag_id}) целых групп DAG во время плановых работ на источниках данных.
  4. Интеграция с CI/CD:

    • На этапе деплоя скрипт в CI-пайплайне (например, GitHub Actions) проверял через API, что новая версия DAG успешно загружена в Airflow и прошла парсинг без ошибок (GET /api/v1/dags/{dag_id}/details).

Использование API позволило сделать Airflow не изолированным оркестратором, а активной частью общей инфраструктуры, управляемой программно.