Работали ли вы с Apache Airflow?

Ответ

Да, я использовал Apache Airflow для оркестрации ETL/ELT-пайплайнов и планирования задач. Вот ключевые аспекты моего опыта:

1. Разработка DAG-ов:

  • Создание Directed Acyclic Graphs для описания зависимостей задач.
  • Использование различных операторов: PythonOperator, BashOperator, DockerOperator, а также кастомных операторов для интеграции с внутренними API.
  • Пример DAG для ежедневной загрузки данных:
    
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
    from datetime import datetime, timedelta

def _process_data():

Логика предобработки данных

import pandas as pd
df = pd.read_csv('/tmp/data.csv')
df['processed_at'] = datetime.now()
df.to_parquet('/tmp/data_processed.parquet')

default_args = { 'owner': 'data_team', 'retries': 3, 'retry_delay': timedelta(minutes=5) }

with DAG( dag_id='daily_data_pipeline', default_args=default_args, start_date=datetime(2023, 1, 1), schedule_interval='0 2 *', # Каждый день в 02:00 catchup=False ) as dag:

process = PythonOperator(
    task_id='process_raw_data',
    python_callable=_process_data
)

load = GCSToBigQueryOperator(
    task_id='load_to_bigquery',
    bucket='my-data-bucket',
    source_objects=['processed/data_*.parquet'],
    destination_project_dataset_table='project.dataset.daily_sales',
    write_disposition='WRITE_APPEND',
    source_format='PARQUET'
)

process >> load  # Определение зависимости


**2. Мониторинг и управление:**
*   Настройка алертирования при падении задач (интеграция с Slack, Email).
*   Использование **пулов (pools)** для ограничения параллельного выполнения ресурсоемких задач.
*   Работа с **XCom** для передачи небольших данных между задачами.

**3. Развертывание и инфраструктура:**
*   Запуск Airflow в **Docker** для локальной разработки.
*   Развертывание в продакшене на **Kubernetes** с использованием официального Helm-чарта для масштабирования и отказоустойчивости.

Airflow стал центральным инструментом для обеспечения надежности, повторяемости и мониторинга наших фоновых процессов.

Ответ 18+ 🔞

А, Airflow, говоришь? Ну, это ж классика, ебать мои старые костыли! Как будто на дворе 2002-й год, но работает — хуй с горы. Использовал, конечно, куда ж без него, когда надо эти ваши ETL-пайплайны по расписанию гонять, чтобы всё не вручную, как последние мартышлюшки.

1. Про DAG-и эти ваши: Ну, это когда ты рисуешь, кто за кем должен бежать, чтобы не получилось, что одна задача ждёт другую, а та уже давно накрылась медным тазом. Писал эти графы, Directed Acyclic Graphs, короче. Использовал кучу операторов: PythonOperator — чтобы свой скрипт питоновский впихнуть, BashOperator — чтобы пошурудить в консоли, DockerOperator — чтобы в контейнере всё изолированно крутилось. А ещё свои, кастомные, лепил, когда надо было к какому-нибудь внутреннему API доебаться, у которого документация — пиздопроебибна.

Вот, смотри, пример, как мы ежедневную выгрузку делали. Всё по науке: сначала данные обработать, потом в хранилище засунуть.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime, timedelta

def _process_data():
    # Логика предобработки данных
    import pandas as pd
    df = pd.read_csv('/tmp/data.csv')
    df['processed_at'] = datetime.now()
    df.to_parquet('/tmp/data_processed.parquet')

default_args = {
    'owner': 'data_team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='daily_data_pipeline',
    default_args=default_args,
    start_date=datetime(2023, 1, 1),
    schedule_interval='0 2 * * *',  # Каждый день в 02:00
    catchup=False
) as dag:

    process = PythonOperator(
        task_id='process_raw_data',
        python_callable=_process_data
    )

    load = GCSToBigQueryOperator(
        task_id='load_to_bigquery',
        bucket='my-data-bucket',
        source_objects=['processed/data_*.parquet'],
        destination_project_dataset_table='project.dataset.daily_sales',
        write_disposition='WRITE_APPEND',
        source_format='PARQUET'
    )

    process >> load  # Определение зависимости

Видишь, в конце process >> load? Это я говорю системе: «Э, сабака сука, сначала process, потом load, а не наоборот!» А то будет тебе хиросима и нигерсраки.

2. Слежка и управление: Тут без мониторинга — нихуя. Настроил алерты в Slack, чтобы, если задача сдохла, мне сразу писали «Чувак, у тебя там всё хуёво». Пуллы (pools) — вещь офигенная, когда у тебя есть задачи, которые жрут ресурсов овердохуища. Чтоб они все разом не запустились и сервак не лег, ты им говоришь: «Ребята, в этом бассейне максимум два человека!» И они очередь соблюдают. А XCom — это такая хитрая жопа для передачи мелких данных между задачами. Типа «Вот, держи, братан, на следующем шаге пригодится».

3. Как это всё жило: Локально, ясное дело, в Docker всё крутил — удобно, быстро. А в продакшене — Kubernetes, ёпта. Развернул через Helm-чарт официальный, и пусть себе масштабируется, как хочет. Главное — отказоустойчивость, чтобы если один воркер сдох, другие подхватили.

В общем, Airflow стал такой центральной штукой, вокруг которой все эти фоновые процессы пляшут. Надёжность, повторяемость, и главное — видно, что где-то зависло и почему. Без него был бы пиздец, а не пайплайны.