Расскажите о своем опыте написания Kafka Producer на Go. Какие библиотеки использовали и с какими особенностями сталкивались?

Ответ

Да, у меня есть опыт работы с Kafka, включая написание продюсеров на Go. В основном я использовал библиотеки sarama и confluent-kafka-go.

Основные библиотеки:

  • github.com/IBM/sarama (или ее предшественник Shopify/sarama): Это нативная Go-библиотека. Она удобна, так как не требует CGo и внешних зависимостей. Отлично подходит для большинства задач.
  • github.com/confluentinc/confluent-kafka-go: Это Go-обертка над высокопроизводительной C-библиотекой librdkafka. Обычно ее выбирают для систем с очень высокой нагрузкой, где критична каждая миллисекунда.

Пример простого синхронного продюсера с sarama:

package main

import (
    "fmt"
    "log"

    "github.com/IBM/sarama"
)

func main() {
    config := sarama.NewConfig()
    // Ожидаем подтверждения от лидера брокера
    config.Producer.RequiredAcks = sarama.WaitForLocal
    // Включаем возврат успешных сообщений
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        log.Fatalln("Failed to start Sarama producer:", err)
    }
    defer func() { _ = producer.Close() }()

    msg := &sarama.ProducerMessage{
        Topic: "test-topic",
        Value: sarama.StringEncoder("Hello Kafka!"),
    }

    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        log.Fatalln("Failed to send message:", err)
    }

    fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)n", msg.Topic, partition, offset)
}

Ключевые аспекты при работе с продюсером:

  • Синхронный vs. Асинхронный продюсер: SyncProducer блокирует выполнение до получения ответа от брокера, что проще для понимания и гарантирует доставку (при правильной конфигурации). AsyncProducer работает через каналы, обеспечивая гораздо более высокую пропускную способность, но требует более сложной обработки ошибок и подтверждений.
  • Конфигурация (RequiredAcks): Важный параметр, определяющий надежность. WaitForLocal (ожидать подтверждения от лидера) — хороший баланс. WaitForAll — максимальная надежность (ожидание от всех реплик), NoResponse — максимальная производительность, но с риском потери данных.
  • Партиционирование: Для упорядоченной доставки сообщений, связанных с одной сущностью (например, все события для одного пользователя), используется ключ сообщения (msg.Key). Kafka гарантирует, что все сообщения с одинаковым ключом попадут в одну и ту же партицию.
  • Идемпотентность: Включение идемпотентного продюсера (config.Producer.Idempotent = true) защищает от дублирования сообщений при повторных отправках (retries), что критически важно для финансовых и транзакционных систем.