Ответ
Да, у меня есть опыт работы с Kafka, включая написание продюсеров на Go. В основном я использовал библиотеки sarama
и confluent-kafka-go
.
Основные библиотеки:
github.com/IBM/sarama
(или ее предшественникShopify/sarama
): Это нативная Go-библиотека. Она удобна, так как не требует CGo и внешних зависимостей. Отлично подходит для большинства задач.github.com/confluentinc/confluent-kafka-go
: Это Go-обертка над высокопроизводительной C-библиотекойlibrdkafka
. Обычно ее выбирают для систем с очень высокой нагрузкой, где критична каждая миллисекунда.
Пример простого синхронного продюсера с sarama
:
package main
import (
"fmt"
"log"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
// Ожидаем подтверждения от лидера брокера
config.Producer.RequiredAcks = sarama.WaitForLocal
// Включаем возврат успешных сообщений
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
defer func() { _ = producer.Close() }()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello Kafka!"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Fatalln("Failed to send message:", err)
}
fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)n", msg.Topic, partition, offset)
}
Ключевые аспекты при работе с продюсером:
- Синхронный vs. Асинхронный продюсер:
SyncProducer
блокирует выполнение до получения ответа от брокера, что проще для понимания и гарантирует доставку (при правильной конфигурации).AsyncProducer
работает через каналы, обеспечивая гораздо более высокую пропускную способность, но требует более сложной обработки ошибок и подтверждений. - Конфигурация (
RequiredAcks
): Важный параметр, определяющий надежность.WaitForLocal
(ожидать подтверждения от лидера) — хороший баланс.WaitForAll
— максимальная надежность (ожидание от всех реплик),NoResponse
— максимальная производительность, но с риском потери данных. - Партиционирование: Для упорядоченной доставки сообщений, связанных с одной сущностью (например, все события для одного пользователя), используется ключ сообщения (
msg.Key
). Kafka гарантирует, что все сообщения с одинаковым ключом попадут в одну и ту же партицию. - Идемпотентность: Включение идемпотентного продюсера (
config.Producer.Idempotent = true
) защищает от дублирования сообщений при повторных отправках (retries), что критически важно для финансовых и транзакционных систем.