Ответ
Да, я активно использовал CDC для построения пайплайнов данных в реальном времени. На проекте по созданию операционного дашборда нам требовалось отражать изменения из OLTP-базы (PostgreSQL) в аналитическом хранилище с задержкой в секунды.
Реализация:
- Источник: PostgreSQL с включенным логическим декодированием (
wal_level = logical). - CDC-инструмент: Debezium для захвата изменений из WAL-лога PostgreSQL и потоковой передачи их в Apache Kafka. Каждая таблица-источник отображалась на отдельный Kafka-топик.
- Обработка: Потоковый обработчик на Apache Spark Structured Streaming читал топики Kafka. Для каждого события (INSERT, UPDATE, DELETE) применялась бизнес-логика, и результат записывался в целевую витрину данных в S3 (в формате Delta Lake).
Ключевой код (упрощенно):
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "pg-server.public.orders")
.load()
// Десериализация JSON из Debezium
val ordersChangeData = df.select(from_json(col("value").cast("string"), schema).alias("data"))
.select("data.payload.op", "data.payload.after", "data.payload.before")
// Применение логики (например, upsert в Delta-таблицу)
ordersChangeData.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
// Обработка вставок и обновлений
handleUpserts(batchDF, "op in ('c','u')")
// Обработка удалений
handleDeletes(batchDF, "op = 'd'")
batchDF.unpersist()
}
.start()
Результат: Мы получили актуальную аналитическую картину с минимальной задержкой, что позволило бизнесу быстро реагировать на изменения.
Ответ 18+ 🔞
Да ты представляешь, какой это был пиздец, когда нам сказали: "Ребята, надо, чтобы данные в дашборде обновлялись не раз в сутки, а прям вот щас, в реальном времени!" Я такой: "Ёпта, ну ладно, давайте тогда CDC подключать, хуй с горы".
Как мы это, блядь, делали:
- Откуда брали: Взяли нашу основную базу, PostgreSQL, и включили там логическое декодирование, типа
wal_level = logical. Это чтобы она сама себе в лог писала, кто что сделал — вставил, обновил или удалил. Без этого нихуя бы не вышло. - Ловец изменений: Поставили Debezium — эта штука как хитрая жопа, которая сидит, подслушивает этот лог и каждое изменение швыряет в Apache Kafka. Каждая таблица летела в свой отдельный топик, чтобы не превратить всё в одну большую свалку.
- Обработка: А дальше наш Apache Spark Structured Streaming эти топики читал, как ненормальный, и для каждого события — будь то новая запись, обновление или удаление — применял нужную бизнес-логику. Итог складывал в витрину в S3, но в формате Delta Lake, чтобы всё аккуратно и с историей было.
Вот, смотри, как код примерно выглядел (я его, конечно, упростил, а то там овердохуища логики было):
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker:9092")
.option("subscribe", "pg-server.public.orders")
.load()
// Распаковываем JSON, который Debezium нагенерил
val ordersChangeData = df.select(from_json(col("value").cast("string"), schema).alias("data"))
.select("data.payload.op", "data.payload.after", "data.payload.before")
// И тут начинается магия: применяем логику, типа upsert в Delta-таблицу
ordersChangeData.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
// Ловим вставки и обновления
handleUpserts(batchDF, "op in ('c','u')")
// И не забываем про удаления, а то будет пизда рулю
handleDeletes(batchDF, "op = 'd'")
batchDF.unpersist()
}
.start()
И что в итоге? А в итоге мы получили дашборд, который обновляется с задержкой в пару секунд. Бизнес смотрит — а там уже свежие цифры, можно решения принимать, а не на вчерашние данные пялиться. Честно, сам от себя охуел, когда всё заработало.