Ответ
В Apache Airflow оператор — это шаблон задачи, который описывает единичную работу, которую нужно выполнить. Каждый оператор в DAG представляет собой один конкретный тип действия (например, запуск Python-скрипта, выполнение SQL-запроса или отправку оповещения). Операторы определяют что делать, в то время как как и где выполнять эту задачу определяет исполнитель (Executor).
Основные характеристики:
- Атомарность: Оператор должен выполнять одну логическую операцию.
- Идемпотентность: Повторный запуск оператора с теми же параметрами должен давать тот же результат и не вызывать побочных эффектов. Это ключевое свойство для надежности.
Популярные встроенные операторы:
BashOperator: Выполняет bash-команду или скрипт.PythonOperator: Вызывает произвольную Python-функцию.EmailOperator: Отправляет email.SimpleHttpOperator: Выполняет HTTP-запрос.PostgresOperator,MySqlOperator: Выполняют SQL-запрос в соответствующей БД.DockerOperator: Запускает команду внутри Docker-контейнера.KubernetesPodOperator: Запускает pod в Kubernetes.
Пример DAG с операторами:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_data():
# Логика обработки данных
print("Обработка данных завершена")
# Определение DAG
with DAG(
dag_id='example_etl_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
) as dag:
# Оператор 1: Загрузка данных
download_task = BashOperator(
task_id='download_dataset',
bash_command='curl -o /tmp/data.csv https://example.com/data.csv'
)
# Оператор 2: Обработка данных
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data
)
# Оператор 3: Отправка уведомления
notify_task = BashOperator(
task_id='send_notification',
bash_command='echo "ETL пайплайн успешно выполнен" | mail -s "Airflow Alert" admin@example.com'
)
# Определение зависимостей задач
download_task >> process_task >> notify_task
Для обмена небольшими объемами данных между операторами в рамках одного DAG используется механизм XCom. Для более сложных сценариев интеграции данные следует хранить во внешних системах (базы данных, облачное хранилище, очереди сообщений).
Ответ 18+ 🔞
А, слушай, про операторы в Airflow. Ну это, бля, как кирпичики, из которых пайплайн строится. Каждый такой кирпичик — одна конкретная работа. Запустить скрипт, ёпта, баш-команду выполнить, SQL-запрос впендюрить — вот это всё.
Главная фишка в том, что оператор говорит что делать, а вот как и где это делать — это уже забота какого-нибудь исполнителя (Executor), который всё это безобразие и организует.
Что от них хотят, эти операторы:
- Атомарность: Один оператор — одно чёткое действие. Не надо в одну задачу пихать загрузку, трансформацию и отправку письма твоей тёще. Это пиздец как не по фэншую.
- Идемпотентность: Вот это, бля, святое. Запустил оператор — он отработал. Запустил его ещё раз с теми же параметрами — он должен сделать вид, что нихуя не изменилось, и результат должен быть тот же. Если он каждый раз новую хуйню творит — это распиздяйство, а не оператор. Надёжность на этом держится.
Какие они бывают, эти операторы (самые ходовые):
BashOperator: Тыкаешь ему bash-команду — он её и выполняет. Проще пареной репы.PythonOperator: Вызывает любую твою питонячью функцию. Универсальная штука.EmailOperator: Шлёт письма. "Всё сломалось, иди чини" — классика.SimpleHttpOperator: Тыкает палкой в какой-нибудь HTTP-эндпоинт.PostgresOperator,MySqlOperator: Для тех, кто в базах данных как рыба в воде. Суют SQL-запросы куда надо.DockerOperator: Запускает всю эту хуйню в докер-контейнере, чтобы у тебя на хосте чисто было.KubernetesPodOperator: Это уже для мажоров, которые в кубересах плавают. Запускает целый под.
Вот, смотри, как это выглядит в коде, чтоб ты понимал масштаб:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_data():
# Тут твоя хитрая логика обработки
print("Обработка данных завершена")
# Создаём DAG, ёпта
with DAG(
dag_id='example_etl_dag',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily'
) as dag:
# Задача 1: Скачать данные. BashOperator в деле.
download_task = BashOperator(
task_id='download_dataset',
bash_command='curl -o /tmp/data.csv https://example.com/data.csv'
)
# Задача 2: Обработать их. Питон рулит.
process_task = PythonOperator(
task_id='process_data',
python_callable=process_data
)
# Задача 3: Орать, что всё готово. Опять баш.
notify_task = BashOperator(
task_id='send_notification',
bash_command='echo "ETL пайплайн успешно выполнен" | mail -s "Airflow Alert" admin@example.com'
)
# А вот это магия — говорим, в каком порядке это всё делать.
download_task >> process_task >> notify_task
И последнее, про обмен данными. Если задачам нужно передать друг другу какую-то мелкую хуйню (типа статуса или ID), есть XCom. Но, бля, это не помойка! Не тащи туда гигабайты данных, а то будет тебе хиросима. Для серьёзных объёмов — только внешние системы: базы, облака, очереди. А XCom — для записок "сделано, иди дальше".