Какие операторы Apache Airflow вы использовали?

Ответ

В своих проектах я активно использовал как стандартные, так и специфичные для облака операторы Airflow для оркестрации разнообразных задач:

  • BashOperator & PythonOperator: Для запуска shell-скриптов и вызова Python-функций соответственно. Это основа многих DAG.

    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    
    def _process_data(ti, **kwargs):
        # Получаем результат предыдущей задачи через XCom
        raw_data = ti.xcom_pull(task_ids='extract_data')
        processed_data = transform_logic(raw_data)
        return processed_data
    
    process_task = PythonOperator(
        task_id='process_data',
        python_callable=_process_data,
        provide_context=True
    )
    
    cleanup_task = BashOperator(
        task_id='cleanup_temp_files',
        bash_command='rm -f /tmp/*.tmp'
    )
  • Сенсоры (Sensors): Использовал S3KeySensor для ожидания появления файла-триггера в S3 и ExternalTaskSensor для ожидания завершения другого DAG.

  • Операторы для работы с БД: PostgresOperator для выполнения SQL в PostgreSQL и BigQueryExecuteQueryOperator для Google BigQuery.

    from airflow.providers.postgres.operators.postgres import PostgresOperator
    
    create_table = PostgresOperator(
        task_id='create_staging_table',
        postgres_conn_id='analytics_db',
        sql="""
        CREATE TABLE IF NOT EXISTS stg_orders (
            id INT,
            created_at TIMESTAMP,
            amount DECIMAL
        );
        """
    )
  • Операторы для контейнеризованных и распределённых задач:

    • DockerOperator для запуска задач в изолированных контейнерах с определённым образом.
    • SparkSubmitOperator для отправки Spark-приложений на кластер YARN или Kubernetes. Это было ключевым для ETL-пайплайнов.
  • KubernetesPodOperator: Для запуска задач в виде подов в Kubernetes, что обеспечивало гибкость в выборе окружения и ресурсов для каждой задачи.

Ответ 18+ 🔞

Да ты посмотри, какие у меня инструменты были в арсенале, ёпта! Прямо как у хитрой жопы, которая знает, куда и чем тыкать. В своих проектах я накрутил овердохуища разных операторов Airflow, чтобы оркестровать всю эту движуху.

  • BashOperator & PythonOperator: Это как хлеб с маслом, основа основ. Первый — чтобы скрипты шелл-запускать, второй — чтобы питонячие функции дёргать. Без них нихуя не соберёшь.

    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    
    def _process_data(ti, **kwargs):
        # Вытаскиваем результат из предыдущей задачи через XCom
        raw_data = ti.xcom_pull(task_ids='extract_data')
        processed_data = transform_logic(raw_data)
        return processed_data
    
    process_task = PythonOperator(
        task_id='process_data',
        python_callable=_process_data,
        provide_context=True
    )
    
    cleanup_task = BashOperator(
        task_id='cleanup_temp_files',
        bash_command='rm -f /tmp/*.tmp'
    )
  • Сенсоры (Sensors): А вот это уже приколы. S3KeySensor — сидит, как собака сука, и ждёт, пока в ведёрке S3 нужный файлик не появится. А ExternalTaskSensor — это вообще мудя, он ждёт, пока другой DAG наконец-то свои дела закончит. Терпения ебать ноль, но ждёт.

  • Операторы для работы с БД: Тут всё просто: PostgresOperator пинает PostgreSQL, а BigQueryExecuteQueryOperator долбит Google BigQuery. SQL туда-сюда, и дело в шляпе.

    from airflow.providers.postgres.operators.postgres import PostgresOperator
    
    create_table = PostgresOperator(
        task_id='create_staging_table',
        postgres_conn_id='analytics_db',
        sql="""
        CREATE TABLE IF NOT EXISTS stg_orders (
            id INT,
            created_at TIMESTAMP,
            amount DECIMAL
        );
        """
    )
  • Операторы для контейнеризованных и распределённых задач: А вот тут начинается магия, ядрёна вошь!

    • DockerOperator — засовывает задачу в контейнер, как в банку, чтобы она там в своём мирке работала. Изоляция полная, доверия ебать ноль ко всему остальному.
    • SparkSubmitOperator — это уже тяжёлая артиллерия. Швыряет Spark-приложение на кластер, будь то YARN или Kubernetes, и пусть там всё горит синим пламенем. Для ETL-пайплайнов — самое то, бля буду.
  • KubernetesPodOperator: Ну а это, чувак, верх гибкости. Запускает каждую задачу как отдельный под в кубере. Хочешь — один образ, хочешь — другой, ресурсы настраивай какие вдумаешь. Прям волшебная палочка, ебать копать.