Зачем писали кастомные операторы на Airflow?

«Зачем писали кастомные операторы на Airflow?» — вопрос из категории Apache Airflow, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Кастомные операторы в Airflow создаются для инкапсуляции специфичной логики, которую не покрывают встроенные операторы (PythonOperator, BashOperator и т.д.).

Основные причины:

  1. Инкапсуляция и переиспользование: Часто используемую логику (например, загрузку данных в S3 или запрос к внутреннему API) удобно вынести в отдельный оператор и использовать в разных DAG.
  2. Упрощение DAG: Сложная бизнес-логика скрывается внутри оператора, что делает граф задач (DAG) чище и понятнее.
  3. Стандартизация и контроль: Кастомный оператор обеспечивает единообразный способ выполнения задачи с предопределенными параметрами, обработкой ошибок и логированием.
  4. Интеграция со специализированными системами: Для работы с конкретными сервисами (например, отправка алертов в PagerDuty, управление кластером Databricks) часто пишут свой оператор.

Пример: оператор для вызова REST API

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import requests

class CustomApiOperator(BaseOperator):
    @apply_defaults
    def __init__(self, endpoint: str, method: str = 'GET', **kwargs):
        super().__init__(**kwargs)
        self.endpoint = endpoint
        self.method = method

    def execute(self, context):
        self.log.info(f'Calling {self.method} {self.endpoint}')
        response = requests.request(method=self.method, url=self.endpoint)
        response.raise_for_status()  # Проверка на HTTP-ошибки
        return response.json()

Использование в DAG становится декларативным: CustomApiOperator(task_id='fetch_data', endpoint='https://api.example.com/data').