Ответ
Да, у меня есть опыт работы с Kafka, включая написание продюсеров на Go. В основном я использовал библиотеки sarama и confluent-kafka-go.
Основные библиотеки:
github.com/IBM/sarama(или ее предшественникShopify/sarama): Это нативная Go-библиотека. Она удобна, так как не требует CGo и внешних зависимостей. Отлично подходит для большинства задач.github.com/confluentinc/confluent-kafka-go: Это Go-обертка над высокопроизводительной C-библиотекойlibrdkafka. Обычно ее выбирают для систем с очень высокой нагрузкой, где критична каждая миллисекунда.
Пример простого синхронного продюсера с sarama:
package main
import (
"fmt"
"log"
"github.com/IBM/sarama"
)
func main() {
config := sarama.NewConfig()
// Ожидаем подтверждения от лидера брокера
config.Producer.RequiredAcks = sarama.WaitForLocal
// Включаем возврат успешных сообщений
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
}
defer func() { _ = producer.Close() }()
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello Kafka!"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Fatalln("Failed to send message:", err)
}
fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)n", msg.Topic, partition, offset)
}Ключевые аспекты при работе с продюсером:
- Синхронный vs. Асинхронный продюсер:
SyncProducerблокирует выполнение до получения ответа от брокера, что проще для понимания и гарантирует доставку (при правильной конфигурации).AsyncProducerработает через каналы, обеспечивая гораздо более высокую пропускную способность, но требует более сложной обработки ошибок и подтверждений. - Конфигурация (
RequiredAcks): Важный параметр, определяющий надежность.WaitForLocal(ожидать подтверждения от лидера) — хороший баланс.WaitForAll— максимальная надежность (ожидание от всех реплик),NoResponse— максимальная производительность, но с риском потери данных. - Партиционирование: Для упорядоченной доставки сообщений, связанных с одной сущностью (например, все события для одного пользователя), используется ключ сообщения (
msg.Key). Kafka гарантирует, что все сообщения с одинаковым ключом попадут в одну и ту же партицию. - Идемпотентность: Включение идемпотентного продюсера (
config.Producer.Idempotent = true) защищает от дублирования сообщений при повторных отправках (retries), что критически важно для финансовых и транзакционных систем.