Расскажите о своем опыте работы с Apache Kafka. Какие задачи решали и какие библиотеки использовали?

Ответ

Да, у меня есть опыт работы с Apache Kafka. Я использовал его для построения асинхронных, событийно-ориентированных систем и для обмена данными между микросервисами.

Основные задачи, которые я решал:

  • Публикация и подписка на сообщения: Создание продюсеров (producers) для отправки событий и консьюмеров (consumers) для их обработки.
  • Организация Consumer Groups: Группировка консьюмеров для параллельной обработки сообщений из топика и обеспечения отказоустойчивости.
  • Гарантии доставки: Настройка механизмов доставки сообщений (at-least-once, at-most-once, exactly-once) в зависимости от требований к системе.
  • Работа с партициями и офсетами: Управление чтением из конкретных партиций и ручное/автоматическое управление смещениями (offsets).

Библиотеки для Go:

Я работал с двумя популярными библиотеками:

  1. github.com/IBM/sarama: Это зрелая и широко используемая библиотека с богатым функционалом.
  2. github.com/confluentinc/confluent-kafka-go: Это обертка над C-библиотекой librdkafka, которая считается эталоном производительности и надежности. Часто выбираю ее для высоконагруженных проектов.

Пример простого продюсера (на confluent-kafka-go):

p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
    log.Fatalf("Failed to create producer: %s", err)
}
defer p.Close()

topic := "test-topic"
msg := &kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value:          []byte("Hello Kafka"),
}
p.Produce(msg, nil)
p.Flush(15 * 1000) // Ждем доставки сообщения

Пример консьюмера (на confluent-kafka-go):

c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "group.id":          "my-group",
    "auto.offset.reset": "earliest",
})
if err != nil {
    log.Fatalf("Failed to create consumer: %s", err)
}
defer c.Close()

c.SubscribeTopics([]string{"test-topic"}, nil)

for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
        fmt.Printf("Received from %s: %sn", msg.TopicPartition, string(msg.Value))
    } else {
        fmt.Printf("Consumer error: %v (%v)n", err, msg)
    }
}