Какие брокеры сообщений (очереди) вы использовали и для решения каких задач?

Ответ

В своей практике я активно работал с различными брокерами сообщений, такими как RabbitMQ, Kafka и NATS, для построения отказоустойчивых и масштабируемых систем. Выбор конкретного инструмента всегда зависел от требований задачи.

  • RabbitMQ: Использовал для реализации асинхронных задач (task queues) и сложных сценариев маршрутизации сообщений благодаря протоколу AMQP.

    • Ключевые задачи: фоновая обработка видео, отправка email-уведомлений.
    • Применяемые паттерны: Dead-Letter-Queues для обработки ошибочных сообщений, Publish/Subscribe для рассылки событий нескольким потребителям, подтверждение получения сообщений (ack/nack) для гарантии доставки.
  • Kafka: Применял в системах, где требовалась обработка больших потоков данных в реальном времени (стриминг).

    • Ключевые задачи: сбор и агрегация логов, event sourcing, построение data pipelines.
    • Особенности: работал с партициями для горизонтального масштабирования, Consumer Groups для параллельной обработки и настраивал семантику доставки (at-least-once, exactly-once).
  • NATS / NATS JetStream: Выбирал для систем, где в приоритете была максимальная производительность и минимальная задержка.

    • Ключевые задачи: межсервисное взаимодействие в реальном времени, рассылка метрик, RPC-подобные вызовы.
    • Особенности: NATS Core идеален для сценариев fire-and-forget, в то время как NATS JetStream добавляет персистентность и гарантии доставки, приближаясь по возможностям к Kafka/RabbitMQ, но сохраняя простоту и скорость.

Пример простого издателя (publisher) для RabbitMQ на Go:

// Для подключения к RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
// ... обработка ошибок
defer conn.Close()

ch, err := conn.Channel()
// ... обработка ошибок
defer ch.Close()

// Объявляем очередь, чтобы убедиться, что она существует
q, err := ch.QueueDeclare(
    "tasks", // name
    true,    // durable (очередь переживет перезапуск брокера)
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
)

// Публикуем сообщение
err = ch.Publish(
    "",       // exchange
    q.Name,   // routing key
    false,    // mandatory
    false,    // immediate
    amqp.Publishing{
        DeliveryMode: amqp.Persistent, // сообщение переживет перезапуск брокера
        ContentType:  "text/plain",
        Body:         []byte("данные для обработки"),
    })