Опишите ваш опыт тестирования систем на основе Apache Kafka.

Ответ

Работал с Apache Kafka в рамках тестирования event-driven микросервисных архитектур. Основные активности:

  • Интеграционное и E2E-тестирование: Проверка корректности отправки, маршрутизации и обработки событий между сервисами.
  • Валидация данных: Контроль схемы сообщений (использовал Apache Avro с Schema Registry), порядка следования (ordering) и семантики доставки (at-least-once, exactly-once).
  • Тестирование отказоустойчивости: Проверка поведения при падении консьюмеров, обработка сообщений в Dead Letter Queue (DLQ), ретраи.
  • Мониторинг и отладка: Использовал Kafkacat, UI for Apache Kafka и Grafana с метриками Kafka для наблюдения за лагом консьюмеров и состоянием топиков.

Пример автотеста на Java (Kafka Clients API) для проверки отправки и получения:

// Настройка продюсера
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); // Гарантия доставки

try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
    // Отправка синхронно с проверкой
    RecordMetadata metadata = producer.send(
        new ProducerRecord<>("orders", "order-123", "{"status":"new"}")
    ).get();
    System.out.println("Sent to partition: " + metadata.partition());
}

// Настройка консьюмера для валидации
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
    consumer.subscribe(List.of("orders"));
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
    // Проверка полученного сообщения
    records.forEach(record -> {
        assert record.key().equals("order-123");
        assert record.value().contains(""status":"new"");
    });
}

Ответ 18+ 🔞

Да ты посмотри, какой у нас тут специалист по асинхронному говну подъехал! Ну ладно, шучу. Работал, значит, с этой вашей Apache Kafka, пока тестировал эти event-driven микросервисы, которые друг другу сообщения шлют, как записочки на уроке.

Основные мои подвиги, если без соплей:

  • Интеграционное и сквозное тестирование: Следил, чтобы одно приложение своё событие отправило, другое его получило и не обосралось по дороге. Маршрутизация там, обработка — вся эта муть.
  • Валидация данных: А вот это, блядь, важно! Чтобы схема сообщения не поплыла. Использовал Apache Avro с Schema Registry, чтобы каждый новый разработчик не придумывал свой велосипед в JSON. Следил за порядком сообщений и за тем, чтобы они доходили — хоть один раз, хоть ровно один раз. Хуй там, exactly-once — это ж священный грааль, его ещё поискать.
  • Тестирование отказоустойчивости: Любимая часть. Вырубаешь консьюмер — смотришь, как остальные начинают суетиться. Проверял, как сообщения-инвалиды летят в Dead Letter Queue (DLQ), этакую палату для безнадёжных. И ретраи, эти бесконечные попытки «эй, ну прими же меня!».
  • Мониторинг и отладка: Без этого — пипец. Смотрел в Kafkacat, тыкался в UI for Apache Kafka, а в Grafana графики по лагам консьюмеров строил. Потому что если лаг растёт — скоро все нахуй посыпятся, как карточный домик.

А вот пример кода, на котором я мозги себе ебал, пытаясь проверить, что сообщение ушло и пришло:

// Тут мы настраиваем продюсера, чувака, который будет говорить
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); // Гарантия, что все брокеры кивнут, что получили

try (Producer<String, String> producer = new KafkaProducer<>(producerProps)) {
    // Отправляем синхронно, потому что нам надо быть уверенными, как в аптеке
    RecordMetadata metadata = producer.send(
        new ProducerRecord<>("orders", "order-123", "{"status":"new"}")
    ).get(); // .get() — это наше всё, ждём ответа, как дураки
    System.out.println("Отправили в партицию: " + metadata.partition());
}

// А теперь настраиваем консьюмера, подслушивающего у двери
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // Читаем всё с самого начала, как чистые листы
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

try (Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
    consumer.subscribe(List.of("orders")); // Подписываемся на топик с заказами
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10)); // Ждём сообщений, как манны небесной
    // И вот она, проверка, ради которой всё затевалось
    records.forEach(record -> {
        assert record.key().equals("order-123"); // Ключ тот ли?
        assert record.value().contains(""status":"new""); // Значение не обосралось?
    });
}

Вот так вот, епта. Кажется, просто отправил-получил, а на деле — волнение ебать, каждый раз как в первый раз. Вдруг продюсер загнётся, консьюмер уснёт, а сообщение так и будет болтаться в лимбе. Красота!