Ответ
В Airflow сенсоры (Sensors) я использовал для ожидания выполнения внешних условий перед запуском DAG или задачи. Это позволяет строить отказоустойчивые и зависимые пайплайны.
Основные сценарии применения:
- Ожидание появления файла: Запуск обработки только после того, как в S3-бакете или на SFTP-сервере появится файл с данными за определённую дату.
- Зависимость от завершения внешнего процесса: Ожидание, пока другая система (например, Spark-джоба или сторонний API) завершит свою работу и выставит флаг (например, запишет метку в БД).
- Проверка доступности данных в БД: Ожидание, когда в целевой таблице появятся свежие записи, необходимые для следующего этапа трансформации.
Пример сенсора для ожидания файла в S3:
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_file = S3KeySensor(
task_id='wait_for_daily_file',
bucket_name='my-data-bucket',
bucket_key='data/landing/{{ ds }}/input.csv',
aws_conn_id='aws_default',
mode='poke',
poke_interval=60, # Проверять каждые 60 секунд
timeout=60 * 30 # Таймаут 30 минут
)
Сенсор будет «пинговать» S3 раз в минуту, и задача process_data запустится только после того, как файл появится.
Ответ 18+ 🔞
А, сенсоры в Airflow! Ну это ж классика, ёпта. Представь себе: у тебя пайплайн готов, всё настроено, а данные-то ещё не приехали. И что, запускать обработку пустоты? Да похуй, конечно, нет. Вот тут эти самые сенсоры и спасают — они как сторожевые псы, которые сидят и ждут, пока нужное условие не выполнится. Без них — распиздяйство полное, задачи будут падать как мухи, потому что нихуя не нашли.
Где их впаривают, эти сенсоры:
- Ждём файлик, как манны небесной. Самый частый случай. Допустим, какая-то мартышлюшка из другой команды должна выгрузить отчёт в S3 к 9 утра, а он появляется только к 11, да ещё и кривой. Твой сенсор будет тупо стучать по бакету раз в N минут и орать «Э, сабака сука, где мой файл?». И только когда файл материализуется — пошёл процесс дальше. Овердохуища удобно.
- Зависли от чужого процесса. Бывает, твой DAG — это только кусочек большой цепочки. Сначала должен отработать какой-нибудь монстр на Spark, который данные нагенерят, и только потом твоя чистка. Сенсор может ждать, пока этот монстр не запишет в какую-нибудь табличку статус
SUCCESS. Иначе — сиди, жди, не дергайся. - Данные в базе подтягиваются. Тут тоже всё просто. Нужно, чтобы в таблице
raw_dataпоявились записи за сегодня. Сенсор тупо дергает БД запросом типаSELECT COUNT(*) FROM raw_data WHERE date = TODAY. Пока ноль — спим. Как только число больше нуля — ёб твою мать, поехали!
Вот, смотри, как выглядит этот сторожевой пёс для S3 на практике:
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_file = S3KeySensor(
task_id='wait_for_daily_file',
bucket_name='my-data-bucket',
bucket_key='data/landing/{{ ds }}/input.csv',
aws_conn_id='aws_default',
mode='poke',
poke_interval=60, # Тыримся в бакет каждые 60 секунд
timeout=60 * 30 # А через 30 минут терпения ноль ебать — сдаёмся
)
Суть в чём: эта штука (wait_for_daily_file) будет тикать раз в минуту и проверять, не завалился ли в папку data/landing/2024-01-01/ (ну или какая там дата) файлик input.csv. Пока его нет — задача висит в статусе «сенсор». Как только файл объявился — сенсор срабатывает, и следующая задача (например, process_data) получает зелёный свет. А если за полчаса файл так и не появился — ну, значит, сегодня будет нам хиросима, задача упадёт с таймаутом, и можно уже бить тревогу. Красота, а не логика.