Для каких задач применял сенсоры в Apache Airflow?

Ответ

В Airflow сенсоры (Sensors) я использовал для ожидания выполнения внешних условий перед запуском DAG или задачи. Это позволяет строить отказоустойчивые и зависимые пайплайны.

Основные сценарии применения:

  1. Ожидание появления файла: Запуск обработки только после того, как в S3-бакете или на SFTP-сервере появится файл с данными за определённую дату.
  2. Зависимость от завершения внешнего процесса: Ожидание, пока другая система (например, Spark-джоба или сторонний API) завершит свою работу и выставит флаг (например, запишет метку в БД).
  3. Проверка доступности данных в БД: Ожидание, когда в целевой таблице появятся свежие записи, необходимые для следующего этапа трансформации.

Пример сенсора для ожидания файла в S3:

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_file = S3KeySensor(
    task_id='wait_for_daily_file',
    bucket_name='my-data-bucket',
    bucket_key='data/landing/{{ ds }}/input.csv',
    aws_conn_id='aws_default',
    mode='poke',
    poke_interval=60,  # Проверять каждые 60 секунд
    timeout=60 * 30    # Таймаут 30 минут
)

Сенсор будет «пинговать» S3 раз в минуту, и задача process_data запустится только после того, как файл появится.

Ответ 18+ 🔞

А, сенсоры в Airflow! Ну это ж классика, ёпта. Представь себе: у тебя пайплайн готов, всё настроено, а данные-то ещё не приехали. И что, запускать обработку пустоты? Да похуй, конечно, нет. Вот тут эти самые сенсоры и спасают — они как сторожевые псы, которые сидят и ждут, пока нужное условие не выполнится. Без них — распиздяйство полное, задачи будут падать как мухи, потому что нихуя не нашли.

Где их впаривают, эти сенсоры:

  1. Ждём файлик, как манны небесной. Самый частый случай. Допустим, какая-то мартышлюшка из другой команды должна выгрузить отчёт в S3 к 9 утра, а он появляется только к 11, да ещё и кривой. Твой сенсор будет тупо стучать по бакету раз в N минут и орать «Э, сабака сука, где мой файл?». И только когда файл материализуется — пошёл процесс дальше. Овердохуища удобно.
  2. Зависли от чужого процесса. Бывает, твой DAG — это только кусочек большой цепочки. Сначала должен отработать какой-нибудь монстр на Spark, который данные нагенерят, и только потом твоя чистка. Сенсор может ждать, пока этот монстр не запишет в какую-нибудь табличку статус SUCCESS. Иначе — сиди, жди, не дергайся.
  3. Данные в базе подтягиваются. Тут тоже всё просто. Нужно, чтобы в таблице raw_data появились записи за сегодня. Сенсор тупо дергает БД запросом типа SELECT COUNT(*) FROM raw_data WHERE date = TODAY. Пока ноль — спим. Как только число больше нуля — ёб твою мать, поехали!

Вот, смотри, как выглядит этот сторожевой пёс для S3 на практике:

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_file = S3KeySensor(
    task_id='wait_for_daily_file',
    bucket_name='my-data-bucket',
    bucket_key='data/landing/{{ ds }}/input.csv',
    aws_conn_id='aws_default',
    mode='poke',
    poke_interval=60,  # Тыримся в бакет каждые 60 секунд
    timeout=60 * 30    # А через 30 минут терпения ноль ебать — сдаёмся
)

Суть в чём: эта штука (wait_for_daily_file) будет тикать раз в минуту и проверять, не завалился ли в папку data/landing/2024-01-01/ (ну или какая там дата) файлик input.csv. Пока его нет — задача висит в статусе «сенсор». Как только файл объявился — сенсор срабатывает, и следующая задача (например, process_data) получает зелёный свет. А если за полчаса файл так и не появился — ну, значит, сегодня будет нам хиросима, задача упадёт с таймаутом, и можно уже бить тревогу. Красота, а не логика.