Ответ
Кастомные операторы в Airflow создаются для инкапсуляции специфичной логики, которую не покрывают встроенные операторы (PythonOperator, BashOperator и т.д.).
Основные причины:
- Инкапсуляция и переиспользование: Часто используемую логику (например, загрузку данных в S3 или запрос к внутреннему API) удобно вынести в отдельный оператор и использовать в разных DAG.
- Упрощение DAG: Сложная бизнес-логика скрывается внутри оператора, что делает граф задач (
DAG) чище и понятнее. - Стандартизация и контроль: Кастомный оператор обеспечивает единообразный способ выполнения задачи с предопределенными параметрами, обработкой ошибок и логированием.
- Интеграция со специализированными системами: Для работы с конкретными сервисами (например, отправка алертов в PagerDuty, управление кластером Databricks) часто пишут свой оператор.
Пример: оператор для вызова REST API
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import requests
class CustomApiOperator(BaseOperator):
@apply_defaults
def __init__(self, endpoint: str, method: str = 'GET', **kwargs):
super().__init__(**kwargs)
self.endpoint = endpoint
self.method = method
def execute(self, context):
self.log.info(f'Calling {self.method} {self.endpoint}')
response = requests.request(method=self.method, url=self.endpoint)
response.raise_for_status() # Проверка на HTTP-ошибки
return response.json()
Использование в DAG становится декларативным: CustomApiOperator(task_id='fetch_data', endpoint='https://api.example.com/data').