Как тестировать системы с брокерами очередей (RabbitMQ, Kafka)?

«Как тестировать системы с брокерами очередей (RabbitMQ, Kafka)?» — вопрос из категории Архитектура, который задают на 10% собеседований QA Тестировщик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Тестирование фокусируется на гарантированной доставке сообщений, отказоустойчивости и корректности обработки.

RabbitMQ (AMQP):

  • Проверка маршрутизации: Корректность работы обменников (exchanges), типов (direct, topic, fanout) и привязок (bindings).
  • Надежность доставки: Подтверждение получения (acknowledgement), персистентность сообщений и очередей (durable).
  • Тестирование сценариев: Поведение системы при потере соединения, переполнении очереди.

Apache Kafka:

  • Семантика доставки: Проверка at-least-once / exactly-once доставки.
  • Партиционирование и порядок: Корректность назначения партиций, порядок сообщений в пределах партиции.
  • Офсеты: Корректность управления смещениями (offsets) консьюмером.
  • Отказоустойчивость: Поведение при потере лидера партиции (re-election).

Пример интеграционного теста для RabbitMQ с Pika:

import pika
import json

def test_message_published_and_consumed():
    # Конфигурация теста
    connection_params = pika.ConnectionParameters(host='localhost')
    test_queue = 'test.queue'
    test_message = {'event': 'user.registered', 'id': 42}

    # Публикация сообщения
    with pika.BlockingConnection(connection_params) as conn:
        channel = conn.channel()
        channel.queue_declare(queue=test_queue, durable=True)
        channel.basic_publish(
            exchange='',
            routing_key=test_queue,
            body=json.dumps(test_message),
            properties=pika.BasicProperties(delivery_mode=2) # persistent
        )

    # Чтение и проверка сообщения
    with pika.BlockingConnection(connection_params) as conn:
        channel = conn.channel()
        method_frame, header_frame, body = channel.basic_get(queue=test_queue, auto_ack=True)

        assert method_frame is not None, "Сообщение не доставлено в очередь"
        received_message = json.loads(body)
        assert received_message == test_message

Для Kafka часто используются встроенные тестовые кластеры (например, @EmbeddedKafka в Spring) или инструменты вроде kafkacat для ручных проверок.