Ответ
Да, я работал с RabbitMQ для построения асинхронных и распределенных систем на Go.
Библиотеки
Изначально многие использовали github.com/streadway/amqp, но она уже несколько лет не поддерживается. Сейчас стандартом является официальная, поддерживаемая сообществом и VMware библиотека — github.com/rabbitmq/amqp091-go.
Основные концепции RabbitMQ
Для эффективной работы важно понимать ключевые компоненты:
- Producer: Приложение, которое отправляет сообщения.
- Exchange (Обменник): Получает сообщения от Producer и направляет их в одну или несколько очередей. Тип обменника (
direct,topic,fanout,headers) определяет логику маршрутизации. - Queue (Очередь): Хранит сообщения до тех пор, пока они не будут обработаны.
- Binding (Связь): Правило, которое связывает Exchange и Queue.
- Consumer: Приложение, которое подписывается на очередь и обрабатывает сообщения.
Используемые паттерны
- Work Queues (Очереди задач): Для распределения ресурсоемких задач между несколькими обработчиками (workers). Каждое сообщение обрабатывается только одним worker'ом.
- Publish/Subscribe: Для рассылки одного сообщения всем подписчикам. Используется
fanoutexchange. - Routing: Для выборочной отправки сообщений подписчикам на основе ключа маршрутизации (
routing key). Используетсяdirectexchange. - Topics: Наиболее гибкий вариант маршрутизации, где подписчики могут использовать шаблоны (wildcards) для подписки на сообщения. Используется
topicexchange.
Пример кода (отправка сообщения):
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// ...
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
// handle err
defer conn.Close()
ch, err := conn.Channel()
// handle err
defer ch.Close()
// Объявляем очередь, куда будем отправлять сообщения
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// handle err
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = ch.PublishWithContext(ctx,
"", // exchange (пустое значение - default exchange)
q.Name, // routing key (имя очереди для default exchange)
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
},
)
// handle errВажные аспекты:
- Acknowledgements (Подтверждения): Настройка подтверждений (
ack/nack) со стороны Consumer'а — ключевой механизм для обеспечения надежной доставки сообщений. - Graceful Shutdown: Корректное закрытие соединений и каналов для предотвращения потери данных.