Ответ
Да, у меня есть опыт работы с Apache Kafka. Я использовал его для построения асинхронных, событийно-ориентированных систем и для обмена данными между микросервисами.
Основные задачи, которые я решал:
- Публикация и подписка на сообщения: Создание продюсеров (producers) для отправки событий и консьюмеров (consumers) для их обработки.
- Организация Consumer Groups: Группировка консьюмеров для параллельной обработки сообщений из топика и обеспечения отказоустойчивости.
- Гарантии доставки: Настройка механизмов доставки сообщений (at-least-once, at-most-once, exactly-once) в зависимости от требований к системе.
- Работа с партициями и офсетами: Управление чтением из конкретных партиций и ручное/автоматическое управление смещениями (offsets).
Библиотеки для Go:
Я работал с двумя популярными библиотеками:
github.com/IBM/sarama: Это зрелая и широко используемая библиотека с богатым функционалом.github.com/confluentinc/confluent-kafka-go: Это обертка над C-библиотекойlibrdkafka, которая считается эталоном производительности и надежности. Часто выбираю ее для высоконагруженных проектов.
Пример простого продюсера (на confluent-kafka-go):
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer p.Close()
topic := "test-topic"
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("Hello Kafka"),
}
p.Produce(msg, nil)
p.Flush(15 * 1000) // Ждем доставки сообщения
Пример консьюмера (на confluent-kafka-go):
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
defer c.Close()
c.SubscribeTopics([]string{"test-topic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Received from %s: %sn", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)n", err, msg)
}
} Ответ 18+ 🔞
А, опыт с Кафкой? Да, блядь, был, как же без него в наше время-то! Всё эти микросервисы, события, асинхронщина — без Кафки нихуя не построишь, это ж аксиома, как дважды два — хуй в пальто.
Чем я, собственно, занимался, пока мозги не начали плавиться:
- Писал и читал сообщения: Делал продюсеров, которые шлют события, и консьюмеров, которые их жрут. Всё как в жизни: кто-то производит, а кто-то потребляет, ёпта.
- Тусил в группах: Собирал консьюмеров в кучки (Consumer Groups), чтобы они сообща топик обрабатывали и друг друга подстраховывали. А то один отвалится — и пиздец, всё на нём встало.
- Гарантии доставки выкручивал: Настраивал эту хуйню — at-least-once, at-most-once, exactly-once. Последний — это вообще ёперный театр, там надо так заморочиться, что волосы дыбом встают. Но если бизнес требует — делаешь, блядь.
- Возился с партициями и офсетами: Разбирался, откуда читать и куда писать. Иногда офсет вручную двигал — как будто на разбитой машине коробку переключаешь, скрежещет всё, но едет.
Библиотеки на Go, с которыми руки пачкал:
github.com/IBM/sarama— старая, добрая, проверенная. Всё в ней есть, как в швейцарском армейском ноже, включая отвёртку, которой можно глаз себе выковырять, если неаккуратно.github.com/confluentinc/confluent-kafka-go— это уже обёртка над нативной Си-шной либой (librdkafka). Она, сука, быстрая как чёрт и надёжная как швейцарские часы. Для серьёзных нагрузок — это наш выбор, бро... то есть, выбор, блядь.
Вот, смотри, как простенький продюсер на confluent-kafka-go выглядит (код не трогаю, он святой):
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer p.Close()
topic := "test-topic"
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte("Hello Kafka"),
}
p.Produce(msg, nil)
p.Flush(15 * 1000) // Ждем доставки сообщения
А вот консьюмер, который сидит и ждёт, когда ему что-нибудь прилетит в топик:
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatalf("Failed to create consumer: %s", err)
}
defer c.Close()
c.SubscribeTopics([]string{"test-topic"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Received from %s: %sn", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)n", err, msg)
}
}
Вот так вот, в двух словах. А если вдаваться в детали — про ретеншены, компрессии, кворумы реплик и прочую магию — это на целую сагу, блядь. Но основа — она вот тут, простая и понятная, как ёбушки-воробушки.