Ответ
Да, я работал с Kafka, в основном для тестирования и автоматизации проверок интеграций. В моей практике это включало:
- Настройку тестовых окружений с помощью Docker Compose, где поднимались брокеры Kafka, Zookeeper и создавались необходимые топики.
- Написание интеграционных тестов с использованием библиотек, таких как
spring-kafka-test(для Java) илиkafka-python. Например, для проверки, что сервис корректно публикует события в нужный топик. - Использование CLI-утилит Kafka для ручных проверок и отладки. Например, чтобы быстро проверить содержимое топика или убедиться в доставке сообщения.
Пример интеграционного теста на Java (с использованием Spring Boot и JUnit 5):
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class OrderServiceTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
void shouldPublishOrderCreatedEvent() throws Exception {
// Arrange
String topic = "order-events";
String expectedEventPayload = "{"orderId":"123"}";
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> consumedPayload = new AtomicReference<>();
// Act: Подписываемся на топик в тесте
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<String, String>(consumerProps).createConsumer();
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
// Запускаем асинхронное потребление
new Thread(() -> {
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
records.forEach(record -> {
consumedPayload.set(record.value());
latch.countDown();
});
}).start();
// Вызываем метод сервиса, который должен отправить событие
orderService.createOrder("123");
// Assert
assertTrue(latch.await(10, TimeUnit.SECONDS), "Сообщение не было получено вовремя");
assertEquals(expectedEventPayload, consumedPayload.get());
}
}
Такой подход позволяет убедиться, что система событий работает корректно в рамках end-to-end сценария.
Ответ 18+ 🔞
Слушай, а про Kafka я тебе могу рассказать такую историю, что просто ебать мои старые костыли. Работал с этой штукой, в основном чтобы всякие интеграции проверять и тесты гонять. Ну, понимаешь, чтобы убедиться, что один сервис другому на ухо правильно шепчет, а не хуйню какую-то в топик пишет.
Что конкретно делал? Ну, стандартный такой набор:
- Тестовые окружения поднимал через Docker Compose. Там этот цирк с брокерами Kafka, Zookeeper'ом, топики создавал — в общем, целый ёперный театр разворачивал, только чтобы потестировать.
- Интеграционные тесты писал. Это когда ты берёшь библиотеку типа
spring-kafka-testи начинаешь проверять: а отправил ли мой сервис событие куда надо? А формат тот? А долетело? Волнение ебать, пока ждёшь результата. - В консольку лез, когда всё совсем плохо. CLI-утилиты Kafka — это как лом: грубо, некрасиво, но когда надо срочно заглянуть в топик и понять, что за хуйня там лежит, то ничего лучше нет.
Вот, смотри, пример кода, как это примерно выглядело. Я его не трогаю, оставляю как есть, тут всё технически точно.
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class OrderServiceTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
void shouldPublishOrderCreatedEvent() throws Exception {
// Arrange
String topic = "order-events";
String expectedEventPayload = "{"orderId":"123"}";
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> consumedPayload = new AtomicReference<>();
// Act: Подписываемся на топик в тесте
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<String, String>(consumerProps).createConsumer();
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
// Запускаем асинхронное потребление
new Thread(() -> {
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
records.forEach(record -> {
consumedPayload.set(record.value());
latch.countDown();
});
}).start();
// Вызываем метод сервиса, который должен отправить событие
orderService.createOrder("123");
// Assert
assertTrue(latch.await(10, TimeUnit.SECONDS), "Сообщение не было получено вовремя");
assertEquals(expectedEventPayload, consumedPayload.get());
}
}
И знаешь, в чём прикол? Когда этот тест зелёный — терпения ноль ебать, но зато ты точно знаешь, что твоя система событий работает не абы как, а по-человечески. Весь сценарий, от начала и до конца, как по нотам. А иначе потом разгребать последствия — это просто пиздец, а не работа.