Основные подходы и библиотеки для работы с Kafka в Go

Ответ

Для работы с Kafka в Go чаще всего используют две популярные библиотеки: segmentio/kafka-go и IBM/sarama. Выбор зависит от требований проекта.

1. IBM/sarama

Это одна из самых старых и зрелых библиотек. Она предоставляет полный контроль над протоколом Kafka.

Ключевые моменты:

  • Гибкость: Позволяет настраивать практически все аспекты взаимодействия с Kafka.
  • Consumer Groups: Современные версии sarama имеют встроенную поддержку ConsumerGroup, которая автоматически обрабатывает ребалансировку партиций. Использование старой библиотеки sarama-cluster не рекомендуется, так как она устарела.
  • Производительность: Поддерживает синхронные и асинхронные продюсеры для разных сценариев.

Пример Producer:

config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
    log.Fatalf("Failed to start Sarama producer: %v", err)
}
defer producer.Close()

msg := &sarama.ProducerMessage{
    Topic: "my-topic",
    Value: sarama.StringEncoder("Hello from Go!"),
}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
    log.Fatalf("Failed to send message: %v", err)
}
fmt.Printf("Message sent to partition %d at offset %dn", partition, offset)

Пример Consumer (с использованием Consumer Group):

// Реализация ConsumerGroupHandler
type ExampleConsumerGroupHandler struct{}

func (h *ExampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (h *ExampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *ExampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("Message claimed: value = %s, topic = %s, partition = %d, offset = %dn", string(msg.Value), msg.Topic, msg.Partition, msg.Offset)
        sess.MarkMessage(msg, "") // Помечаем сообщение как обработанное
    }
    return nil
}

// Запуск консьюмера
client, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
// ...
err = client.Consume(ctx, []string{"my-topic"}, &ExampleConsumerGroupHandler{})

2. segmentio/kafka-go

Более высокоуровневая библиотека, которая предлагает более простой API для частых задач.

Ключевые моменты:

  • Простота: API проще для старта, меньше boilerplate-кода.
  • Автоматизация: Автоматически обрабатывает реконнекты и ребалансировку.
  • Reader/Writer API: Использует удобные абстракции kafka.Reader и kafka.Writer.

Для большинства стандартных задач segmentio/kafka-go является отличным выбором, в то время как sarama предоставляет больше контроля для сложных сценариев.