Ответ
Kafka Producer — это клиентское приложение, отправляющее сообщения (записи) в топики Kafka.
Основные шаги создания:
- Конфигурация: Определение свойств подключения и сериализации.
- Создание экземпляра: Инициализация
KafkaProducer. - Отправка сообщений: Использование метода
send(). - Завершение работы: Обязательное закрытие продюсера через
close().
Пример кода на Java:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// 1. Конфигурация
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Опционально: настройки производительности
props.put("acks", "all");
props.put("retries", 3);
// 2. Создание продюсера
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// 3. Создание и отправка записи
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "message-key", "Hello, Kafka!");
// Отправка с callback для обработки результата (асинхронно)
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Message sent to topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.flush(); // Гарантирует отправку всех сообщений
} // 4. Автоматическое закрытие в try-with-resources
}
}
Ключевые практики:
- Обязательно закрывайте продюсера для освобождения ресурсов.
- Используйте callback (
Callback) для обработки подтверждений (acks) и ошибок. - Настройте
batch.sizeиlinger.msдля оптимизации производительности при высокой нагрузке. - Для идемпотентности и строгого порядка доставки настройте
enable.idempotence=trueиmax.in.flight.requests.per.connection=1.