Работали ли вы с CDC (Change Data Capture)?

Ответ

Да, я активно использовал CDC для построения пайплайнов данных в реальном времени. На проекте по созданию операционного дашборда нам требовалось отражать изменения из OLTP-базы (PostgreSQL) в аналитическом хранилище с задержкой в секунды.

Реализация:

  1. Источник: PostgreSQL с включенным логическим декодированием (wal_level = logical).
  2. CDC-инструмент: Debezium для захвата изменений из WAL-лога PostgreSQL и потоковой передачи их в Apache Kafka. Каждая таблица-источник отображалась на отдельный Kafka-топик.
  3. Обработка: Потоковый обработчик на Apache Spark Structured Streaming читал топики Kafka. Для каждого события (INSERT, UPDATE, DELETE) применялась бизнес-логика, и результат записывался в целевую витрину данных в S3 (в формате Delta Lake).

Ключевой код (упрощенно):

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker:9092")
  .option("subscribe", "pg-server.public.orders")
  .load()

// Десериализация JSON из Debezium
val ordersChangeData = df.select(from_json(col("value").cast("string"), schema).alias("data"))
  .select("data.payload.op", "data.payload.after", "data.payload.before")

// Применение логики (например, upsert в Delta-таблицу)
ordersChangeData.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.persist()
    // Обработка вставок и обновлений
    handleUpserts(batchDF, "op in ('c','u')")
    // Обработка удалений
    handleDeletes(batchDF, "op = 'd'")
    batchDF.unpersist()
  }
  .start()

Результат: Мы получили актуальную аналитическую картину с минимальной задержкой, что позволило бизнесу быстро реагировать на изменения.

Ответ 18+ 🔞

Да ты представляешь, какой это был пиздец, когда нам сказали: "Ребята, надо, чтобы данные в дашборде обновлялись не раз в сутки, а прям вот щас, в реальном времени!" Я такой: "Ёпта, ну ладно, давайте тогда CDC подключать, хуй с горы".

Как мы это, блядь, делали:

  1. Откуда брали: Взяли нашу основную базу, PostgreSQL, и включили там логическое декодирование, типа wal_level = logical. Это чтобы она сама себе в лог писала, кто что сделал — вставил, обновил или удалил. Без этого нихуя бы не вышло.
  2. Ловец изменений: Поставили Debezium — эта штука как хитрая жопа, которая сидит, подслушивает этот лог и каждое изменение швыряет в Apache Kafka. Каждая таблица летела в свой отдельный топик, чтобы не превратить всё в одну большую свалку.
  3. Обработка: А дальше наш Apache Spark Structured Streaming эти топики читал, как ненормальный, и для каждого события — будь то новая запись, обновление или удаление — применял нужную бизнес-логику. Итог складывал в витрину в S3, но в формате Delta Lake, чтобы всё аккуратно и с историей было.

Вот, смотри, как код примерно выглядел (я его, конечно, упростил, а то там овердохуища логики было):

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "broker:9092")
  .option("subscribe", "pg-server.public.orders")
  .load()

// Распаковываем JSON, который Debezium нагенерил
val ordersChangeData = df.select(from_json(col("value").cast("string"), schema).alias("data"))
  .select("data.payload.op", "data.payload.after", "data.payload.before")

// И тут начинается магия: применяем логику, типа upsert в Delta-таблицу
ordersChangeData.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.persist()
    // Ловим вставки и обновления
    handleUpserts(batchDF, "op in ('c','u')")
    // И не забываем про удаления, а то будет пизда рулю
    handleDeletes(batchDF, "op = 'd'")
    batchDF.unpersist()
  }
  .start()

И что в итоге? А в итоге мы получили дашборд, который обновляется с задержкой в пару секунд. Бизнес смотрит — а там уже свежие цифры, можно решения принимать, а не на вчерашние данные пялиться. Честно, сам от себя охуел, когда всё заработало.