Как написать Kafka Producer на Java?

«Как написать Kafka Producer на Java?» — вопрос из категории Брокеры сообщений, который задают на 10% собеседований Java Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Kafka Producer — это клиентское приложение, отправляющее сообщения (записи) в топики Kafka.

Основные шаги создания:

  1. Конфигурация: Определение свойств подключения и сериализации.
  2. Создание экземпляра: Инициализация KafkaProducer.
  3. Отправка сообщений: Использование метода send().
  4. Завершение работы: Обязательное закрытие продюсера через close().

Пример кода на Java:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class SimpleProducer {
    public static void main(String[] args) {
        // 1. Конфигурация
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // Опционально: настройки производительности
        props.put("acks", "all");
        props.put("retries", 3);

        // 2. Создание продюсера
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            // 3. Создание и отправка записи
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("my-topic", "message-key", "Hello, Kafka!");

            // Отправка с callback для обработки результата (асинхронно)
            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.printf("Message sent to topic=%s, partition=%d, offset=%d%n",
                            metadata.topic(), metadata.partition(), metadata.offset());
                } else {
                    exception.printStackTrace();
                }
            });
            producer.flush(); // Гарантирует отправку всех сообщений
        } // 4. Автоматическое закрытие в try-with-resources
    }
}

Ключевые практики:

  • Обязательно закрывайте продюсера для освобождения ресурсов.
  • Используйте callback (Callback) для обработки подтверждений (acks) и ошибок.
  • Настройте batch.size и linger.ms для оптимизации производительности при высокой нагрузке.
  • Для идемпотентности и строгого порядка доставки настройте enable.idempotence=true и max.in.flight.requests.per.connection=1.