Ответ
Да, я работал с RabbitMQ для построения асинхронных и распределенных систем на Go.
Библиотеки
Изначально многие использовали github.com/streadway/amqp, но она уже несколько лет не поддерживается. Сейчас стандартом является официальная, поддерживаемая сообществом и VMware библиотека — github.com/rabbitmq/amqp091-go.
Основные концепции RabbitMQ
Для эффективной работы важно понимать ключевые компоненты:
- Producer: Приложение, которое отправляет сообщения.
- Exchange (Обменник): Получает сообщения от Producer и направляет их в одну или несколько очередей. Тип обменника (
direct,topic,fanout,headers) определяет логику маршрутизации. - Queue (Очередь): Хранит сообщения до тех пор, пока они не будут обработаны.
- Binding (Связь): Правило, которое связывает Exchange и Queue.
- Consumer: Приложение, которое подписывается на очередь и обрабатывает сообщения.
Используемые паттерны
- Work Queues (Очереди задач): Для распределения ресурсоемких задач между несколькими обработчиками (workers). Каждое сообщение обрабатывается только одним worker'ом.
- Publish/Subscribe: Для рассылки одного сообщения всем подписчикам. Используется
fanoutexchange. - Routing: Для выборочной отправки сообщений подписчикам на основе ключа маршрутизации (
routing key). Используетсяdirectexchange. - Topics: Наиболее гибкий вариант маршрутизации, где подписчики могут использовать шаблоны (wildcards) для подписки на сообщения. Используется
topicexchange.
Пример кода (отправка сообщения):
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// ...
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
// handle err
defer conn.Close()
ch, err := conn.Channel()
// handle err
defer ch.Close()
// Объявляем очередь, куда будем отправлять сообщения
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// handle err
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = ch.PublishWithContext(ctx,
"", // exchange (пустое значение - default exchange)
q.Name, // routing key (имя очереди для default exchange)
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
},
)
// handle err
Важные аспекты:
- Acknowledgements (Подтверждения): Настройка подтверждений (
ack/nack) со стороны Consumer'а — ключевой механизм для обеспечения надежной доставки сообщений. - Graceful Shutdown: Корректное закрытие соединений и каналов для предотвращения потери данных.
Ответ 18+ 🔞
Ну, слушай, вот сидишь ты такой, пишешь на Go, и тут тебе надо, чтобы одно приложение на другом конце сервера пиздануло сообщение, а второе его подхватило, да не сразу, а когда захочет. И тут, блядь, на сцену выходит RabbitMQ — этакая мартышлюшка-посредник, которая всё организует.
Раньше все, как дурачки, юзали streadway/amqp, а она, сука, уже лет пять как накрылась медным тазом. Теперь надо брать официальную — github.com/rabbitmq/amqp091-go. Её хоть кто-то поддерживает, не то что те древние артефакты.
Чтобы не выглядеть полным дегенератом, запомни главные термины, а то потом будешь как Герасим — мычать «Муму» и нихуя не понимать.
Producer — это тот, кто пиздаёт сообщения. Как я, когда начинаю нести хуйню после трёх бутылок пива.
Exchange (Обменник) — получает эти посылы и решает, куда их дальше толкать. У него есть свои приколы: direct (точно в цель), topic (по шаблону, типа «всё, что про собак»), fanout (всем сестрам по серьгам) и headers (для совсем ебнутых сценариев).
Queue (Очередь) — это, блядь, просто очередь. Сообщения тут тупо висят, пока их не заберут.
Binding — невидимая ниточка, которая связывает обменник и очередь. Без неё — пиздец, сообщения улетают в никуда.
Consumer — это алкаш, который подходит к стойке бара (очереди) и забирает свою порцию сообщений.
Паттерны, которые реально работают:
- Work Queues — когда задач дохуища, и ты нанимаешь толпу рабов (workers), чтобы они их расхерачивали. Одно сообщение — один раб.
- Publish/Subscribe — это когда ты орал на всю деревню, а тебя слышали все, у кого уши не заложило. Используешь
fanoutexchange. - Routing — уже поумнее. Кричишь «Собирайся, Вань!», и только Ваня откликается. Это
directexchange. - Topics — военная хитрость. Можешь орать «собака.*», и тебя услышат все, у кого есть «собака.лает» или «собака.спит».
topicexchange, ёпта.
А вот, смотри, как отправить сообщение. Просто скопируй этот кусок кода, не еби мозг:
import (
"context"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// ...
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
// handle err
defer conn.Close()
ch, err := conn.Channel()
// handle err
defer ch.Close()
// Объявляем очередь, куда будем отправлять сообщения
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// handle err
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = ch.PublishWithContext(ctx,
"", // exchange (пустое значение - default exchange)
q.Name, // routing key (имя очереди для default exchange)
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("Hello World!"),
},
)
// handle err
И главное, блядь, не забудь про две вещи, а то наебнёшься:
- Acknowledgements (Подтверждения). Consumer должен явно сказать: «О, спасибо, сообщение получил, обработал, всё чики-пуки» (ack). Или «На хуй это дерьмо, дай другое» (nack). Без этого твои сообщения будут теряться, как носки в стиральной машине.
- Graceful Shutdown. Когда приложение закрывается, надо аккуратно прибраться: закрыть каналы и соединение. А то останешься как тот мудак, который ушёл, не заплатив в баре, — на тебя все собаки будут вешать лапшу.