Что такое logical date в Apache Airflow?

Ответ

Logical Date (ранее назывался execution_date) — это центральное понятие в Airflow, представляющее логическую дату и время, для которых выполняется DAG Run. Это НЕ фактическое время запуска задачи.

Ключевые моменты:

  • Это точка на временной шкале данных, которые задача должна обработать. Например, для ежедневного DAG, запущенного 10 января для обработки данных за 9 января, logical_date будет 2024-01-09.
  • Используется для шаблонизации через макросы (например, {{ ds }}, {{ logical_date }}).
  • Позволяет выполнять backfill (перезапуск) за прошлые периоды, сохраняя контекст обработки корректных данных.

Пример в коде задачи:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_data(**context):
    # logical_date доступен в контексте выполнения
    data_interval_end = context["logical_date"]
    print(f"Обрабатываю данные за период, заканчивающийся: {data_interval_end}")
    # Например, загружаем файл с именем data_2024-01-09.csv

with DAG('my_dag', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
    run_task = PythonOperator(
        task_id='process',
        python_callable=process_data
    )

Ответ 18+ 🔞

Давай разберёмся с этой штукой под названием logical_date, а то народ постоянно путается, и потом волнение ебать, когда всё падает. Раньше она execution_date называлась, но суть та же.

Представь себе, что ты — повар на кухне. Каждый день в 6 утра ты приходишь и готовишь отчёт по вчерашним заказам. Фактически ты работаешь сегодня, 10 января, но обрабатываешь ты данные за вчера, за 9 января. Так вот, logical_date — это и есть то самое «вчера», 9 января. Это логическая дата, для которой всё и делается. Это НЕ время, когда твой код запустился на сервере.

Зачем это, блядь, нужно? Ну, например, чтобы правильно назвать файл. Ты же не будешь файл с данными за 9 января называть data_2024-01-10.csv? Это же пиздец. Ты используешь шаблоны в Airflow, подставляешь {{ ds }} (это и есть logical_date в формате YYYY-MM-DD), и получаешь красивый data_2024-01-09.csv. Всё логично.

А ещё это позволяет делать backfill — то есть догнать упущенное. Допустим, у тебя неделю скрипт не работал. Ты можешь запустить его, и он по этому самому logical_date поймёт, за какие именно дни нужно обработать данные, а не попытается всё сделать за сегодня. Без этого была бы полная мудя.

Смотри, как в коде выглядит:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_data(**context):
    # Вот тут мы вытаскиваем эту самую логическую дату из контекста
    data_interval_end = context["logical_date"]
    print(f"Обрабатываю данные за период, заканчивающийся: {data_interval_end}")
    # И, например, грузим файл с правильным именем
    # load_file(f"data_{data_interval_end}.csv")

with DAG('my_dag', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:
    run_task = PythonOperator(
        task_id='process',
        python_callable=process_data
    )

Короче, запомни раз и навсегда: logical_date — это «за какой день», а не «когда запустился». Перепутаешь — получишь хиросиму в логах и данные по пизде пойдут. Доверия к таким пайплайнам будет ноль ебать.