Ответ
Основная структурная единица в Airflow — это DAG (Directed Acyclic Graph), или направленный ациклический граф.
Что это значит на практике:
- Directed (Направленный): Задачи (tasks) имеют четкие зависимости — одна задача выполняется после другой. Это направление задается операторами
>>и<<. - Acyclic (Ациклический): В графе не может быть циклов. Задача не может зависеть от самой себя ни прямо, ни через цепочку других задач. Это гарантирует, что пайплайн когда-нибудь завершится.
- Graph (Граф): Задачи и зависимости между ними визуализируются как граф в Airflow UI, что делает отладку и мониторинг интуитивно понятными.
Пример простого DAG на Python:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
def process_data():
# Логика обработки данных
print("Обрабатываю данные...")
# ...
# Определение DAG
with DAG(
dag_id='my_etl_pipeline',
start_date=datetime(2024, 1, 1),
schedule_interval=timedelta(days=1), # Запуск раз в день
catchup=False,
default_args={
'retries': 2,
'retry_delay': timedelta(minutes=5),
},
) as dag:
# Задача 1: Загрузка данных
download = BashOperator(
task_id='download_dataset',
bash_command='curl -o /tmp/data.csv https://example.com/data.csv'
)
# Задача 2: Обработка данных
process = PythonOperator(
task_id='process_data',
python_callable=process_data
)
# Задача 3: Отправка уведомления
notify = BashOperator(
task_id='send_notification',
bash_command='echo "ETL завершен" | mail -s "Airflow" team@example.com'
)
# Определение зависимостей: download -> process -> notify
download >> process >> notify
В этом примере my_etl_pipeline — это DAG, содержащий три задачи, которые выполняются строго последовательно. Весь этот код живет в одном Python-файле в папке dags/ вашего Airflow.