Опишите сложную задачу из прошлого проекта, связанную с обработкой событий

Ответ

Задача: Разработать отказоустойчивую систему обработки событий с гарантированной доставкой и идемпотентностью при высокой нагрузке (~10 000 событий в секунду).

Проблемы:

  1. Потеря событий при сбое после обработки бизнес-логики, но до подтверждения.
  2. Дублирующая обработка при повторной доставке сообщения из брокера (at-least-once delivery).
  3. Согласованность между изменением состояния в базе данных и фактом обработки события.

Архитектурное решение (паттерны):

  1. Transactional Outbox: Для атомарности сохранения в БД и отправки события.

    • Событие сначала сохраняется в той же транзакции, что и бизнес-сущность, в таблицу outbox.
    • Отдельный процесс (например, CDC или scheduled task) читает outbox и публикует события в Kafka.
  2. Идемпотентные обработчики: Каждый обработчик в потребителе проверяет, не обрабатывалось ли событие ранее.

Пример кода обработчика в потребителе Kafka:

@Component
public class EventProcessor {
    @Autowired
    private EventLogService eventLogService; // Сервис для записи в таблицу обработанных событий
    @Autowired
    private BusinessService businessService;
    @Autowired
    private TransactionTemplate transactionTemplate;

    @KafkaListener(topics = "order-events")
    public void handle(OrderEvent event) {
        // 1. Проверка идемпотентности
        if (eventLogService.isEventProcessed(event.getId())) {
            log.info("Event {} already processed, skipping", event.getId());
            return;
        }

        // 2. Обработка в транзакции
        transactionTemplate.execute(status -> {
            // Бизнес-логика
            businessService.processOrder(event.getOrderId(), event.getData());
            // Фиксация факта обработки события в БД
            eventLogService.markEventAsProcessed(event.getId());
            return null;
        });
        // 3. После успешной транзакции offset в Kafka будет закоммичен
    }
}

Итоговые характеристики системы:

  • Гарантированная доставка: Событие не потеряется благодаря Outbox.
  • Идемпотентность: Повторная доставка того же события не приведет к дублированию бизнес-эффектов.
  • Согласованность: Состояние БД и факт обработки события изменяются атомарно в одной транзакции.

Ответ 18+ 🔞

А, ну ты смотри, какие задачи подкидывают! Отказоустойчивость, идемпотентность, десять тысяч событий в секунду... Ёперный театр, да это ж не проект, а минное поле, на котором каждый шаг — пиздец!

Слушай, главная беда-то в чём? Ну, представим: твоя микрослужба получила событие, отработала его, базу обновила, а потом — хрясь! — упала, не успев сказать брокеру «окей, я всё, забирай следующее». И что? А брокер-то, хитрая жопа, думает: «Ага, не подтвердил, значит, не обработал!» И шлёт это же событие снова. И пошла плясать губерния: один заказ — две доставки, один платёж — два списания. Короче, пизда системе и доверию к ней.

И тут на сцену выходит, блядь, паттерн Transactional Outbox. Гениальность в простоте, ебать мои старые костыли! Суть:

  1. Ты не пишешь сразу в Кафку. Ты — мудак? Она же где-то там, в облаках, а транзакция с базой — тут. Связать их атомарно — хуй с горы.
  2. Вместо этого ты создаёшь в своей же базе табличку outbox. И когда делаешь основную бизнес-запись (типа INSERT INTO orders ...), ты в ЭТОЙ ЖЕ, мать его, ТРАНЗАКЦИИ пихаешь событие туда: INSERT INTO outbox (id, topic, payload) VALUES (...).
  3. Всё либо сохраняется вместе, либо откатывается вместе. Красота!
  4. А потом отдельный, ебушки-воробушки, процесс (типа Дебезиума или простой шедулер) выгребает из outbox и тащит в Кафку. Он уж точно не потеряет.

Но это только полдела! Событие-то гарантированно уйдёт. А вот когда его будут жрать потребители — там своя банда. Кафка-то может доставить одно и то же сообщение несколько раз (at-least-once delivery, блядь). И вот тут нужен идемпотентный обработчик.

Смотри, как он должен выглядеть, этот красавец:

@Component
public class EventProcessor {
    @Autowired
    private EventLogService eventLogService; // Этот чувак записывает, какие ивенты мы уже обработали
    @Autowired
    private BusinessService businessService; // А это уже наша бизнес-логика, которая деньги считает
    @Autowired
    private TransactionTemplate transactionTemplate; // Чтоб всё в одной транзакции

    @KafkaListener(topics = "order-events")
    public void handle(OrderEvent event) {
        // ШАГ 1: ПРЕДЪЯВИ ПРОПУСК!
        // Сначала смотрим, а не обрабатывали ли мы этот ID события уже?
        if (eventLogService.isEventProcessed(event.getId())) {
            log.info("Event {} already processed, skipping", event.getId()); // Логируем и посылаем нахуй
            return;
        }

        // ШАГ 2: РАБОТАЕМ ПО-СЕРЬЁЗНОМУ
        // Всё делаем в одной транзакции — или всё, или ничего.
        transactionTemplate.execute(status -> {
            // Вот тут наша святая святых: списать деньги, создать заказ, отправить уведомление
            businessService.processOrder(event.getOrderId(), event.getData());
            // И СРАЗУ ЖЕ, не отходя от кассы, ставим печать: "Обработано, блядь!"
            eventLogService.markEventAsProcessed(event.getId());
            return null; // Всё, транзакция коммитится
        });
        // ШАГ 3: Если транзакция прошла, Кафка сама закоммитит offset. Если упала — offset не сдвинется, и событие придет снова. Но мы-то уже проверим его по ID!
    }
}

Вот и вся магия, Колян! Итог-то какой получается?

  • События не теряются — спасибо Outbox, который как сейф в той же транзакции.
  • Дубли не страшны — обработчик, как маньяк, проверяет ID и говорит: «Этот я уже видел, пошёл вон!»
  • Всё согласовано — бизнес-логика и отметка «обработано» летят в базу одним махом. Либо оба, либо нихуя.

Получается система, которой можно, условно, доверять. Ну, насколько вообще можно доверять чему-то в этом мире, терпения ебать ноль.