Ответ
Да, активно использовал Airflow REST API для автоматизации и интеграции с внешними системами.
Основные сценарии применения:
-
Оркестрация пайплайнов извне:
- Запуск DAG (
POST /api/v1/dags/{dag_id}/dagRuns) по событию из внешней системы. Например, после успешной загрузки нового файла в S3 наша файловая система отправляла webhook, который триггерил запуск соответствующего DAG через API.# Пример кода для запуска DAG с конфигурацией import requests
response = requests.post( 'http://airflow.example.com/api/v1/dags/process_s3_file/dagRuns', headers={'Authorization': 'Bearer ...', 'Content-Type': 'application/json'}, json={ "conf": {"s3_key": "data/2023-10-01.csv"}, "dag_run_id": "manual__2023-10-01T12:00:00" } )
- Запуск DAG (
-
Мониторинг и алертинг:
- Получение статуса выполнения DAG Run (
GET /api/v1/dags/{dag_id}/dagRuns/{dag_run_id}) для интеграции в корпоративные дашборды (Grafana) или отправки кастомных уведомлений в Slack/Telegram при сбоях. - Получение списка "зависших" задач (запущенных слишком долго) для автоматического реагирования.
- Получение статуса выполнения DAG Run (
-
Динамическое управление DAG:
- В одном из проектов мы использовали API для паузы/возобновления (
PATCH /api/v1/dags/{dag_id}) целых групп DAG во время плановых работ на источниках данных.
- В одном из проектов мы использовали API для паузы/возобновления (
-
Интеграция с CI/CD:
- На этапе деплоя скрипт в CI-пайплайне (например, GitHub Actions) проверял через API, что новая версия DAG успешно загружена в Airflow и прошла парсинг без ошибок (
GET /api/v1/dags/{dag_id}/details).
- На этапе деплоя скрипт в CI-пайплайне (например, GitHub Actions) проверял через API, что новая версия DAG успешно загружена в Airflow и прошла парсинг без ошибок (
Использование API позволило сделать Airflow не изолированным оркестратором, а активной частью общей инфраструктуры, управляемой программно.