Ответ
В своих проектах я активно использовал как стандартные, так и специфичные для облака операторы Airflow для оркестрации разнообразных задач:
-
BashOperator & PythonOperator: Для запуска shell-скриптов и вызова Python-функций соответственно. Это основа многих DAG.
from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator def _process_data(ti, **kwargs): # Получаем результат предыдущей задачи через XCom raw_data = ti.xcom_pull(task_ids='extract_data') processed_data = transform_logic(raw_data) return processed_data process_task = PythonOperator( task_id='process_data', python_callable=_process_data, provide_context=True ) cleanup_task = BashOperator( task_id='cleanup_temp_files', bash_command='rm -f /tmp/*.tmp' ) -
Сенсоры (Sensors): Использовал
S3KeySensorдля ожидания появления файла-триггера в S3 иExternalTaskSensorдля ожидания завершения другого DAG. -
Операторы для работы с БД:
PostgresOperatorдля выполнения SQL в PostgreSQL иBigQueryExecuteQueryOperatorдля Google BigQuery.from airflow.providers.postgres.operators.postgres import PostgresOperator create_table = PostgresOperator( task_id='create_staging_table', postgres_conn_id='analytics_db', sql=""" CREATE TABLE IF NOT EXISTS stg_orders ( id INT, created_at TIMESTAMP, amount DECIMAL ); """ ) -
Операторы для контейнеризованных и распределённых задач:
DockerOperatorдля запуска задач в изолированных контейнерах с определённым образом.SparkSubmitOperatorдля отправки Spark-приложений на кластер YARN или Kubernetes. Это было ключевым для ETL-пайплайнов.
-
KubernetesPodOperator: Для запуска задач в виде подов в Kubernetes, что обеспечивало гибкость в выборе окружения и ресурсов для каждой задачи.
Ответ 18+ 🔞
Да ты посмотри, какие у меня инструменты были в арсенале, ёпта! Прямо как у хитрой жопы, которая знает, куда и чем тыкать. В своих проектах я накрутил овердохуища разных операторов Airflow, чтобы оркестровать всю эту движуху.
-
BashOperator & PythonOperator: Это как хлеб с маслом, основа основ. Первый — чтобы скрипты шелл-запускать, второй — чтобы питонячие функции дёргать. Без них нихуя не соберёшь.
from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator def _process_data(ti, **kwargs): # Вытаскиваем результат из предыдущей задачи через XCom raw_data = ti.xcom_pull(task_ids='extract_data') processed_data = transform_logic(raw_data) return processed_data process_task = PythonOperator( task_id='process_data', python_callable=_process_data, provide_context=True ) cleanup_task = BashOperator( task_id='cleanup_temp_files', bash_command='rm -f /tmp/*.tmp' ) -
Сенсоры (Sensors): А вот это уже приколы.
S3KeySensor— сидит, как собака сука, и ждёт, пока в ведёрке S3 нужный файлик не появится. АExternalTaskSensor— это вообще мудя, он ждёт, пока другой DAG наконец-то свои дела закончит. Терпения ебать ноль, но ждёт. -
Операторы для работы с БД: Тут всё просто:
PostgresOperatorпинает PostgreSQL, аBigQueryExecuteQueryOperatorдолбит Google BigQuery. SQL туда-сюда, и дело в шляпе.from airflow.providers.postgres.operators.postgres import PostgresOperator create_table = PostgresOperator( task_id='create_staging_table', postgres_conn_id='analytics_db', sql=""" CREATE TABLE IF NOT EXISTS stg_orders ( id INT, created_at TIMESTAMP, amount DECIMAL ); """ ) -
Операторы для контейнеризованных и распределённых задач: А вот тут начинается магия, ядрёна вошь!
DockerOperator— засовывает задачу в контейнер, как в банку, чтобы она там в своём мирке работала. Изоляция полная, доверия ебать ноль ко всему остальному.SparkSubmitOperator— это уже тяжёлая артиллерия. Швыряет Spark-приложение на кластер, будь то YARN или Kubernetes, и пусть там всё горит синим пламенем. Для ETL-пайплайнов — самое то, бля буду.
-
KubernetesPodOperator: Ну а это, чувак, верх гибкости. Запускает каждую задачу как отдельный под в кубере. Хочешь — один образ, хочешь — другой, ресурсы настраивай какие вдумаешь. Прям волшебная палочка, ебать копать.