Что такое оператор в Apache Airflow?

Ответ

В Apache Airflow оператор — это шаблон задачи, который описывает единичную работу, которую нужно выполнить. Каждый оператор в DAG представляет собой один конкретный тип действия (например, запуск Python-скрипта, выполнение SQL-запроса или отправку оповещения). Операторы определяют что делать, в то время как как и где выполнять эту задачу определяет исполнитель (Executor).

Основные характеристики:

  • Атомарность: Оператор должен выполнять одну логическую операцию.
  • Идемпотентность: Повторный запуск оператора с теми же параметрами должен давать тот же результат и не вызывать побочных эффектов. Это ключевое свойство для надежности.

Популярные встроенные операторы:

  • BashOperator: Выполняет bash-команду или скрипт.
  • PythonOperator: Вызывает произвольную Python-функцию.
  • EmailOperator: Отправляет email.
  • SimpleHttpOperator: Выполняет HTTP-запрос.
  • PostgresOperator, MySqlOperator: Выполняют SQL-запрос в соответствующей БД.
  • DockerOperator: Запускает команду внутри Docker-контейнера.
  • KubernetesPodOperator: Запускает pod в Kubernetes.

Пример DAG с операторами:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_data():
    # Логика обработки данных
    print("Обработка данных завершена")

# Определение DAG
with DAG(
    dag_id='example_etl_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily'
) as dag:

    # Оператор 1: Загрузка данных
    download_task = BashOperator(
        task_id='download_dataset',
        bash_command='curl -o /tmp/data.csv https://example.com/data.csv'
    )

    # Оператор 2: Обработка данных
    process_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data
    )

    # Оператор 3: Отправка уведомления
    notify_task = BashOperator(
        task_id='send_notification',
        bash_command='echo "ETL пайплайн успешно выполнен" | mail -s "Airflow Alert" admin@example.com'
    )

    # Определение зависимостей задач
    download_task >> process_task >> notify_task

Для обмена небольшими объемами данных между операторами в рамках одного DAG используется механизм XCom. Для более сложных сценариев интеграции данные следует хранить во внешних системах (базы данных, облачное хранилище, очереди сообщений).

Ответ 18+ 🔞

А, слушай, про операторы в Airflow. Ну это, бля, как кирпичики, из которых пайплайн строится. Каждый такой кирпичик — одна конкретная работа. Запустить скрипт, ёпта, баш-команду выполнить, SQL-запрос впендюрить — вот это всё.

Главная фишка в том, что оператор говорит что делать, а вот как и где это делать — это уже забота какого-нибудь исполнителя (Executor), который всё это безобразие и организует.

Что от них хотят, эти операторы:

  • Атомарность: Один оператор — одно чёткое действие. Не надо в одну задачу пихать загрузку, трансформацию и отправку письма твоей тёще. Это пиздец как не по фэншую.
  • Идемпотентность: Вот это, бля, святое. Запустил оператор — он отработал. Запустил его ещё раз с теми же параметрами — он должен сделать вид, что нихуя не изменилось, и результат должен быть тот же. Если он каждый раз новую хуйню творит — это распиздяйство, а не оператор. Надёжность на этом держится.

Какие они бывают, эти операторы (самые ходовые):

  • BashOperator: Тыкаешь ему bash-команду — он её и выполняет. Проще пареной репы.
  • PythonOperator: Вызывает любую твою питонячью функцию. Универсальная штука.
  • EmailOperator: Шлёт письма. "Всё сломалось, иди чини" — классика.
  • SimpleHttpOperator: Тыкает палкой в какой-нибудь HTTP-эндпоинт.
  • PostgresOperator, MySqlOperator: Для тех, кто в базах данных как рыба в воде. Суют SQL-запросы куда надо.
  • DockerOperator: Запускает всю эту хуйню в докер-контейнере, чтобы у тебя на хосте чисто было.
  • KubernetesPodOperator: Это уже для мажоров, которые в кубересах плавают. Запускает целый под.

Вот, смотри, как это выглядит в коде, чтоб ты понимал масштаб:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_data():
    # Тут твоя хитрая логика обработки
    print("Обработка данных завершена")

# Создаём DAG, ёпта
with DAG(
    dag_id='example_etl_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily'
) as dag:

    # Задача 1: Скачать данные. BashOperator в деле.
    download_task = BashOperator(
        task_id='download_dataset',
        bash_command='curl -o /tmp/data.csv https://example.com/data.csv'
    )

    # Задача 2: Обработать их. Питон рулит.
    process_task = PythonOperator(
        task_id='process_data',
        python_callable=process_data
    )

    # Задача 3: Орать, что всё готово. Опять баш.
    notify_task = BashOperator(
        task_id='send_notification',
        bash_command='echo "ETL пайплайн успешно выполнен" | mail -s "Airflow Alert" admin@example.com'
    )

    # А вот это магия — говорим, в каком порядке это всё делать.
    download_task >> process_task >> notify_task

И последнее, про обмен данными. Если задачам нужно передать друг другу какую-то мелкую хуйню (типа статуса или ID), есть XCom. Но, бля, это не помойка! Не тащи туда гигабайты данных, а то будет тебе хиросима. Для серьёзных объёмов — только внешние системы: базы, облака, очереди. А XCom — для записок "сделано, иди дальше".