Расскажите о своем опыте работы с RabbitMQ в Go. Какие библиотеки и паттерны вы использовали?

Ответ

Да, я работал с RabbitMQ для построения асинхронных и распределенных систем на Go.

Библиотеки

Изначально многие использовали github.com/streadway/amqp, но она уже несколько лет не поддерживается. Сейчас стандартом является официальная, поддерживаемая сообществом и VMware библиотека — github.com/rabbitmq/amqp091-go.

Основные концепции RabbitMQ

Для эффективной работы важно понимать ключевые компоненты:

  1. Producer: Приложение, которое отправляет сообщения.
  2. Exchange (Обменник): Получает сообщения от Producer и направляет их в одну или несколько очередей. Тип обменника (direct, topic, fanout, headers) определяет логику маршрутизации.
  3. Queue (Очередь): Хранит сообщения до тех пор, пока они не будут обработаны.
  4. Binding (Связь): Правило, которое связывает Exchange и Queue.
  5. Consumer: Приложение, которое подписывается на очередь и обрабатывает сообщения.

Используемые паттерны

  • Work Queues (Очереди задач): Для распределения ресурсоемких задач между несколькими обработчиками (workers). Каждое сообщение обрабатывается только одним worker'ом.
  • Publish/Subscribe: Для рассылки одного сообщения всем подписчикам. Используется fanout exchange.
  • Routing: Для выборочной отправки сообщений подписчикам на основе ключа маршрутизации (routing key). Используется direct exchange.
  • Topics: Наиболее гибкий вариант маршрутизации, где подписчики могут использовать шаблоны (wildcards) для подписки на сообщения. Используется topic exchange.

Пример кода (отправка сообщения):

import (
    "context"
    "log"
    "time"
    amqp "github.com/rabbitmq/amqp091-go"
)

// ...

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
// handle err
defer conn.Close()

ch, err := conn.Channel()
// handle err
defer ch.Close()

// Объявляем очередь, куда будем отправлять сообщения
q, err := ch.QueueDeclare(
    "hello", // name
    false,   // durable
    false,   // delete when unused
    false,   // exclusive
    false,   // no-wait
    nil,     // arguments
)
// handle err

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = ch.PublishWithContext(ctx,
    "",     // exchange (пустое значение - default exchange)
    q.Name, // routing key (имя очереди для default exchange)
    false,  // mandatory
    false,  // immediate
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte("Hello World!"),
    },
)
// handle err

Важные аспекты:

  • Acknowledgements (Подтверждения): Настройка подтверждений (ack/nack) со стороны Consumer'а — ключевой механизм для обеспечения надежной доставки сообщений.
  • Graceful Shutdown: Корректное закрытие соединений и каналов для предотвращения потери данных.