Видел ли интерфейс Kafka?

Ответ

Да, я работал с Kafka, в основном для тестирования и автоматизации проверок интеграций. В моей практике это включало:

  • Настройку тестовых окружений с помощью Docker Compose, где поднимались брокеры Kafka, Zookeeper и создавались необходимые топики.
  • Написание интеграционных тестов с использованием библиотек, таких как spring-kafka-test (для Java) или kafka-python. Например, для проверки, что сервис корректно публикует события в нужный топик.
  • Использование CLI-утилит Kafka для ручных проверок и отладки. Например, чтобы быстро проверить содержимое топика или убедиться в доставке сообщения.

Пример интеграционного теста на Java (с использованием Spring Boot и JUnit 5):

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class OrderServiceTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    void shouldPublishOrderCreatedEvent() throws Exception {
        // Arrange
        String topic = "order-events";
        String expectedEventPayload = "{"orderId":"123"}";
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<String> consumedPayload = new AtomicReference<>();

        // Act: Подписываемся на топик в тесте
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<String, String>(consumerProps).createConsumer();
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);

        // Запускаем асинхронное потребление
        new Thread(() -> {
            ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
            records.forEach(record -> {
                consumedPayload.set(record.value());
                latch.countDown();
            });
        }).start();

        // Вызываем метод сервиса, который должен отправить событие
        orderService.createOrder("123");

        // Assert
        assertTrue(latch.await(10, TimeUnit.SECONDS), "Сообщение не было получено вовремя");
        assertEquals(expectedEventPayload, consumedPayload.get());
    }
}

Такой подход позволяет убедиться, что система событий работает корректно в рамках end-to-end сценария.

Ответ 18+ 🔞

Слушай, а про Kafka я тебе могу рассказать такую историю, что просто ебать мои старые костыли. Работал с этой штукой, в основном чтобы всякие интеграции проверять и тесты гонять. Ну, понимаешь, чтобы убедиться, что один сервис другому на ухо правильно шепчет, а не хуйню какую-то в топик пишет.

Что конкретно делал? Ну, стандартный такой набор:

  • Тестовые окружения поднимал через Docker Compose. Там этот цирк с брокерами Kafka, Zookeeper'ом, топики создавал — в общем, целый ёперный театр разворачивал, только чтобы потестировать.
  • Интеграционные тесты писал. Это когда ты берёшь библиотеку типа spring-kafka-test и начинаешь проверять: а отправил ли мой сервис событие куда надо? А формат тот? А долетело? Волнение ебать, пока ждёшь результата.
  • В консольку лез, когда всё совсем плохо. CLI-утилиты Kafka — это как лом: грубо, некрасиво, но когда надо срочно заглянуть в топик и понять, что за хуйня там лежит, то ничего лучше нет.

Вот, смотри, пример кода, как это примерно выглядело. Я его не трогаю, оставляю как есть, тут всё технически точно.

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class OrderServiceTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    void shouldPublishOrderCreatedEvent() throws Exception {
        // Arrange
        String topic = "order-events";
        String expectedEventPayload = "{"orderId":"123"}";
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<String> consumedPayload = new AtomicReference<>();

        // Act: Подписываемся на топик в тесте
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
        Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<String, String>(consumerProps).createConsumer();
        embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);

        // Запускаем асинхронное потребление
        new Thread(() -> {
            ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
            records.forEach(record -> {
                consumedPayload.set(record.value());
                latch.countDown();
            });
        }).start();

        // Вызываем метод сервиса, который должен отправить событие
        orderService.createOrder("123");

        // Assert
        assertTrue(latch.await(10, TimeUnit.SECONDS), "Сообщение не было получено вовремя");
        assertEquals(expectedEventPayload, consumedPayload.get());
    }
}

И знаешь, в чём прикол? Когда этот тест зелёный — терпения ноль ебать, но зато ты точно знаешь, что твоя система событий работает не абы как, а по-человечески. Весь сценарий, от начала и до конца, как по нотам. А иначе потом разгребать последствия — это просто пиздец, а не работа.