Ответ
Да, я реализовывал асинхронные DQ-проверки для пайплайнов, где немедленная остановка потока была нежелательна.
Пример из проекта: У нас был Airflow DAG, который загружал ежедневные пакеты транзакций. Вместо встроенных проверок в основном DAG, мы создали отдельный DAG для качества данных.
- Основной DAG (
load_transactions) загружал данные в staging-таблицу и по завершению публиковал событие (например, записывал метаданные в служебную таблицу или отправлял сообщение в Kafka). - DQ DAG (
check_transactions_quality) запускался по триггеру от этого события и выполнял проверки:# Пример проверки в Python (Great Expectations или кастомный код) def check_row_count(): # Сравнение количества загруженных строк с ожидаемым pass def check_duplicates(): # Поиск дубликатов по ключевым полям pass def check_value_ranges(): # Проверка, что суммы транзакций положительные pass - Результаты записывались в отдельную таблицу
dq_resultsи отправлялись алертами в Slack/Email, если были найдены критические нарушения.
Преимущество: Основной процесс загрузки не блокировался, и мы могли гибко настраивать пороги срабатывания алертов и глубину проверок.
Ответ 18+ 🔞
Ну вот, сидишь такой, пишешь свой пайплайн, всё летит как по маслу, а потом бац — и тебе нужно впихнуть туда проверку данных. А если проверка накосячит, то весь твой красивый граф накрывается медным тазом, и все последующие таски стоят, как дураки. Пиздец, а не работа.
Так вот, был у меня случай, когда немедленно останавливать поток было нельзя, ну то есть вообще ни в какую. Представь: загрузка ежедневных транзакций, там объёмы — овердохуища. И если из-за какой-нибудь ерунды, вроде одного кривого дубликата, всё встанет колом, то утром бизнес начнёт звонить и спрашивать, где его отчёты. А ты будешь сидеть и объяснять, что «ой, у нас DQ-чек сломался». Поверь, они тебе в сраку чих-пых пошлют, и будут правы.
Поэтому мы сделали хитрую жопу. Вместо того чтобы впендюривать проверки прямо в основной DAG load_transactions, мы отвязали эту историю нахуй. Сделали всё асинхронно.
Вот как это работало, на пальцах:
- Основной загрузчик (
load_transactions) просто делал своё чёрное дело: херачил данные в staging-таблицу. Как закончил — не тупил, а сразу публиковал событие. Типа, «эй, народ, я всё, данные на месте, можете начинать шашлык». Кидал запись в служебную табличку или сообщение в Kafka — без разницы. - А вот отдельно, как чёрт из табакерки, выскакивал DQ DAG (
check_transactions_quality). Он подписывался на это событие и запускался уже после того, как основная загрузка давно отработала и всем похуй. И вот он уже внутри себя начинал свою возню с проверками.
Сам код проверок — да похуй, что там. Можно на Great Expectations, можно свой велосипед. Суть одна:
# Допустим, проверяем, что строк загрузилось не как у дурака
def check_row_count():
# Сравниваем, сколько насчитали, с тем, что ожидали
pass
# Ищем дубликаты — а то вдруг одна транзакция два раза проскочила
def check_duplicates():
# Пробегаемся по ключевым полям
pass
# Ну и на всякий случай, что суммы не отрицательные. А то клиент уйдёт в минус, и всем будет пизда.
def check_value_ranges():
# Проверяем, что всё в адекватных пределах
pass
- Итоги этой всей движухи мы не вываливали в лог, чтобы их потом искали, а аккуратно складывали в отдельную таблицу
dq_results. И если что-то совсем кривое находилось — вот тут уже летели алерты в Slack или на почту. Но главное — основной-то процесс уже давно отработал! Данные загружены, отчёты пошли, все довольны. А мы уже постфактум разбираемся, что там по качеству.
В чём, блядь, соль? Да в том, что загрузка не блочилась нахуй из-за каждой мелочи. Мы получили гибкость: могли настраивать пороги срабатывания алертов («ну ладно, 10 дубликатов — предупреждение, а 100 — уже критично») и глубину проверок, не пугаясь, что всё упадёт. Волнение ебать — ноль. Просто потому что процессы были развязаны. Ёпта, иногда самые простые решения — самые рабочие.