Ответ
Задача: Разработать отказоустойчивую систему обработки событий с гарантированной доставкой и идемпотентностью при высокой нагрузке (~10 000 событий в секунду).
Проблемы:
- Потеря событий при сбое после обработки бизнес-логики, но до подтверждения.
- Дублирующая обработка при повторной доставке сообщения из брокера (at-least-once delivery).
- Согласованность между изменением состояния в базе данных и фактом обработки события.
Архитектурное решение (паттерны):
-
Transactional Outbox: Для атомарности сохранения в БД и отправки события.
- Событие сначала сохраняется в той же транзакции, что и бизнес-сущность, в таблицу
outbox. - Отдельный процесс (например, CDC или scheduled task) читает
outboxи публикует события в Kafka.
- Событие сначала сохраняется в той же транзакции, что и бизнес-сущность, в таблицу
-
Идемпотентные обработчики: Каждый обработчик в потребителе проверяет, не обрабатывалось ли событие ранее.
Пример кода обработчика в потребителе Kafka:
@Component
public class EventProcessor {
@Autowired
private EventLogService eventLogService; // Сервис для записи в таблицу обработанных событий
@Autowired
private BusinessService businessService;
@Autowired
private TransactionTemplate transactionTemplate;
@KafkaListener(topics = "order-events")
public void handle(OrderEvent event) {
// 1. Проверка идемпотентности
if (eventLogService.isEventProcessed(event.getId())) {
log.info("Event {} already processed, skipping", event.getId());
return;
}
// 2. Обработка в транзакции
transactionTemplate.execute(status -> {
// Бизнес-логика
businessService.processOrder(event.getOrderId(), event.getData());
// Фиксация факта обработки события в БД
eventLogService.markEventAsProcessed(event.getId());
return null;
});
// 3. После успешной транзакции offset в Kafka будет закоммичен
}
}
Итоговые характеристики системы:
- Гарантированная доставка: Событие не потеряется благодаря Outbox.
- Идемпотентность: Повторная доставка того же события не приведет к дублированию бизнес-эффектов.
- Согласованность: Состояние БД и факт обработки события изменяются атомарно в одной транзакции.
Ответ 18+ 🔞
А, ну ты смотри, какие задачи подкидывают! Отказоустойчивость, идемпотентность, десять тысяч событий в секунду... Ёперный театр, да это ж не проект, а минное поле, на котором каждый шаг — пиздец!
Слушай, главная беда-то в чём? Ну, представим: твоя микрослужба получила событие, отработала его, базу обновила, а потом — хрясь! — упала, не успев сказать брокеру «окей, я всё, забирай следующее». И что? А брокер-то, хитрая жопа, думает: «Ага, не подтвердил, значит, не обработал!» И шлёт это же событие снова. И пошла плясать губерния: один заказ — две доставки, один платёж — два списания. Короче, пизда системе и доверию к ней.
И тут на сцену выходит, блядь, паттерн Transactional Outbox. Гениальность в простоте, ебать мои старые костыли! Суть:
- Ты не пишешь сразу в Кафку. Ты — мудак? Она же где-то там, в облаках, а транзакция с базой — тут. Связать их атомарно — хуй с горы.
- Вместо этого ты создаёшь в своей же базе табличку
outbox. И когда делаешь основную бизнес-запись (типаINSERT INTO orders ...), ты в ЭТОЙ ЖЕ, мать его, ТРАНЗАКЦИИ пихаешь событие туда:INSERT INTO outbox (id, topic, payload) VALUES (...). - Всё либо сохраняется вместе, либо откатывается вместе. Красота!
- А потом отдельный, ебушки-воробушки, процесс (типа Дебезиума или простой шедулер) выгребает из
outboxи тащит в Кафку. Он уж точно не потеряет.
Но это только полдела! Событие-то гарантированно уйдёт. А вот когда его будут жрать потребители — там своя банда. Кафка-то может доставить одно и то же сообщение несколько раз (at-least-once delivery, блядь). И вот тут нужен идемпотентный обработчик.
Смотри, как он должен выглядеть, этот красавец:
@Component
public class EventProcessor {
@Autowired
private EventLogService eventLogService; // Этот чувак записывает, какие ивенты мы уже обработали
@Autowired
private BusinessService businessService; // А это уже наша бизнес-логика, которая деньги считает
@Autowired
private TransactionTemplate transactionTemplate; // Чтоб всё в одной транзакции
@KafkaListener(topics = "order-events")
public void handle(OrderEvent event) {
// ШАГ 1: ПРЕДЪЯВИ ПРОПУСК!
// Сначала смотрим, а не обрабатывали ли мы этот ID события уже?
if (eventLogService.isEventProcessed(event.getId())) {
log.info("Event {} already processed, skipping", event.getId()); // Логируем и посылаем нахуй
return;
}
// ШАГ 2: РАБОТАЕМ ПО-СЕРЬЁЗНОМУ
// Всё делаем в одной транзакции — или всё, или ничего.
transactionTemplate.execute(status -> {
// Вот тут наша святая святых: списать деньги, создать заказ, отправить уведомление
businessService.processOrder(event.getOrderId(), event.getData());
// И СРАЗУ ЖЕ, не отходя от кассы, ставим печать: "Обработано, блядь!"
eventLogService.markEventAsProcessed(event.getId());
return null; // Всё, транзакция коммитится
});
// ШАГ 3: Если транзакция прошла, Кафка сама закоммитит offset. Если упала — offset не сдвинется, и событие придет снова. Но мы-то уже проверим его по ID!
}
}
Вот и вся магия, Колян! Итог-то какой получается?
- События не теряются — спасибо Outbox, который как сейф в той же транзакции.
- Дубли не страшны — обработчик, как маньяк, проверяет ID и говорит: «Этот я уже видел, пошёл вон!»
- Всё согласовано — бизнес-логика и отметка «обработано» летят в базу одним махом. Либо оба, либо нихуя.
Получается система, которой можно, условно, доверять. Ну, насколько вообще можно доверять чему-то в этом мире, терпения ебать ноль.