Видел ли интерфейс Kafka?

«Видел ли интерфейс Kafka?» — вопрос из категории Брокеры сообщений, который задают на 24% собеседований AQA / Automation. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Да, я работал с Kafka, в основном для тестирования и автоматизации проверок интеграций. В моей практике это включало:

  • Настройку тестовых окружений с помощью Docker Compose, где поднимались брокеры Kafka, Zookeeper и создавались необходимые топики.
  • Написание интеграционных тестов с использованием библиотек, таких как spring-kafka-test (для Java) или kafka-python. Например, для проверки, что сервис корректно публикует события в нужный топик.
  • Использование CLI-утилит Kafka для ручных проверок и отладки. Например, чтобы быстро проверить содержимое топика или убедиться в доставке сообщения.

Пример интеграционного теста на Java (с использованием Spring Boot и JUnit 5):

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class OrderServiceTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    void shouldPublishOrderCreatedEvent() throws Exception {
        // Arrange
        String topic = "order-events";
        String expectedEventPayload = "{"orderId":"123"}";
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<String> consumedPayload = new AtomicReference<>();

        // Act: Подписываемся на топик в тесте
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<String, String>(consumerProps).createConsumer();
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);

        // Запускаем асинхронное потребление
        new Thread(() -> {
            ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
            records.forEach(record -> {
                consumedPayload.set(record.value());
                latch.countDown();
            });
        }).start();

        // Вызываем метод сервиса, который должен отправить событие
        orderService.createOrder("123");

        // Assert
        assertTrue(latch.await(10, TimeUnit.SECONDS), "Сообщение не было получено вовремя");
        assertEquals(expectedEventPayload, consumedPayload.get());
    }
}

Такой подход позволяет убедиться, что система событий работает корректно в рамках end-to-end сценария.