Какой у вас опыт тестирования систем, использующих RabbitMQ?

Ответ

Имею опыт интеграционного и end-to-end тестирования микросервисных архитектур, где RabbitMQ выступает в роли брокера сообщений. Основные фокусы тестирования:

  • Надёжность доставки: гарантированная доставка сообщений (persistent messages, publisher confirms).
  • Обработка ошибок и повторов: корректная работа Dead Letter Exchanges (DLX) и политик повторной обработки.
  • Маршрутизация: правильность работы разных типов exchange (direct, fanout, topic, headers).
  • Устойчивость к сбоям: поведение системы при недоступности RabbitMQ.

Пример интеграционного теста на Python с библиотекой pika и docker для поднятия изолированного брокера:

import pika
import json
import time
from docker import DockerClient

def test_message_routing_with_topic_exchange():
    """Тест проверяет, что сообщения правильно маршрутизируются topic exchange'ом."""
    # Запускаем RabbitMQ в Docker для изоляции теста
    docker_client = DockerClient.from_env()
    container = docker_client.containers.run(
        "rabbitmq:3-management",
        detach=True,
        ports={"5672/tcp": 5672},
        remove=True
    )
    time.sleep(10)  # Ждем старта контейнера

    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()

        # Объявляем topic exchange и две очереди с разными routing key
        channel.exchange_declare(exchange='logs.topic', exchange_type='topic')
        channel.queue_declare(queue='queue.app.errors')
        channel.queue_declare(queue='queue.app.logs')
        channel.queue_bind(exchange='logs.topic', queue='queue.app.errors', routing_key='app.error.*')
        channel.queue_bind(exchange='logs.topic', queue='queue.app.logs', routing_key='app.#')

        # Публикуем сообщения
        channel.basic_publish(exchange='logs.topic', routing_key='app.error.database', body='DB connection failed')
        channel.basic_publish(exchange='logs.topic', routing_key='app.info.startup', body='Application started')

        # Проверяем, что сообщения попали в правильные очереди
        method_frame, _, body = channel.basic_get(queue='queue.app.errors')
        assert body.decode() == 'DB connection failed'
        if method_frame:
            channel.basic_ack(method_frame.delivery_tag)

        method_frame, _, body = channel.basic_get(queue='queue.app.logs')
        assert body.decode() == 'Application started'
        if method_frame:
            channel.basic_ack(method_frame.delivery_tag)

        # Убеждаемся, что в очереди ошибок только одно сообщение
        method_frame, _, _ = channel.basic_get(queue='queue.app.errors')
        assert method_frame is None

        connection.close()
    finally:
        # Останавливаем и удаляем контейнер
        container.stop()

Для тестирования также использовал RabbitMQ Management HTTP API для проверки состояния очередей и инструменты нагрузочного тестирования (например, rabbitmq-perf-test).

Ответ 18+ 🔞

А, ну ты глянь, какой серьёзный дядя с микросервисами! Самый сок, когда эти все сервисы начинают через кролика друг другу письма любовные слать. А мы, тестировщики, тут как в почтовом отделении — проверяем, не потерялось ли чего, не тому ли адресу ушло, и что будет, если почтальона, то есть самого RabbitMQ, внезапно по голове кирпичом трахнуть.

Основные заморочки, на которых я обычно мозги ломаю, это:

  • Дойдёт или нет? Чтобы сообщение не сгинуло в пучине, если брокер чихнёт. Persistent messages, publisher confirms — вот это всё, святое.
  • А если обосрётся? Ну, сервис, не человек. Как он с ошибками справляется, куда трупы сообщений скидывает (эти ваши Dead Letter Exchanges), и не задолбает ли он всех своими бесконечными повторными отправками.
  • Туда ли пошло? Чтобы письмо «я тебя люблю» не ушло в очередь «жалобы от клиентов». Direct, fanout, topic — каждый exchange как начальник с особыми приколами по маршрутизации.
  • А если кролик сдохнет? Что будет с системой, когда брокер накрывается медным тазом. Паника, истерика или достойное ожидание с повторными подключениями.

Вот, смотри, как я обычно такой topic exchange на живом примере проверяю, на Python'е:

import pika
import json
import time
from docker import DockerClient

def test_message_routing_with_topic_exchange():
    """Тест проверяет, что сообщения правильно маршрутизируются topic exchange'ом."""
    # Поднимаем своего кролика в докере, чтоб никому не мешать
    docker_client = DockerClient.from_env()
    container = docker_client.containers.run(
        "rabbitmq:3-management",
        detach=True,
        ports={"5672/tcp": 5672},
        remove=True
    )
    time.sleep(10)  # Даём бедняге проснуться, ебать его в сраку

    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        channel = connection.channel()

        # Организуем раздаточный пункт (exchange) и две очереди-приёмщицы
        channel.exchange_declare(exchange='logs.topic', exchange_type='topic')
        channel.queue_declare(queue='queue.app.errors')
        channel.queue_declare(queue='queue.app.logs')
        # Привязываем их с умными ключами маршрутизации
        channel.queue_bind(exchange='logs.topic', queue='queue.app.errors', routing_key='app.error.*')
        channel.queue_bind(exchange='logs.topic', queue='queue.app.logs', routing_key='app.#')

        # Публикуем две депеши
        channel.basic_publish(exchange='logs.topic', routing_key='app.error.database', body='DB connection failed')
        channel.basic_publish(exchange='logs.topic', routing_key='app.info.startup', body='Application started')

        # А теперь идём с проверкой — куда что припёрлось?
        method_frame, _, body = channel.basic_get(queue='queue.app.errors')
        assert body.decode() == 'DB connection failed'  # Ошибка — в очередь ошибок!
        if method_frame:
            channel.basic_ack(method_frame.delivery_tag)

        method_frame, _, body = channel.basic_get(queue='queue.app.logs')
        assert body.decode() == 'Application started'  # А инфо-логи — в общую очередь!
        if method_frame:
            channel.basic_ack(method_frame.delivery_tag)

        # И смотрим, не затесалось ли в ошибки чего лишнего
        method_frame, _, _ = channel.basic_get(queue='queue.app.errors')
        assert method_frame is None  # Пусто? Отлично, маршрутизация не подвела!

        connection.close()
    finally:
        # А в конце, как истинный сапёр, убираем за собой контейнер
        container.stop()

Ну и, конечно, без HTTP API RabbitMQ Management никуда — тыкаешь в него палкой, чтобы узнать, сколько сообщений в очереди висит, как здоровье у соединений. А для нагрузочного тестирования — rabbitmq-perf-test в руки, и вперёд, выяснять, когда у кролика начнётся нервный тик и он начнёт терять сообщения. Ёперный театр, в общем.