Что такое топик (topic) в Kafka?

Ответ

Топик (topic) в Kafka — это именованный поток сообщений, который служит для их категоризации. Можно представить топик как таблицу в базе данных или папку в файловой системе, но для потоковых данных.

Продюсеры (producers) отправляют сообщения в топики, а консьюмеры (consumers) подписываются на них, чтобы читать эти сообщения.

Ключевые характеристики топика:

  • Именование: Каждый топик имеет уникальное имя в кластере Kafka.
  • Партиционирование (Partitioning): Топик разделен на одну или несколько партиций. Партиции — это упорядоченные, неизменяемые последовательности сообщений. Разделение на партиции позволяет распараллелить обработку данных: несколько консьюмеров из одной группы могут читать сообщения из разных партиций одного топика одновременно. Порядок сообщений гарантируется только в пределах одной партиции.
  • Неизменяемость (Immutability): Сообщения в партициях являются append-only, то есть их нельзя изменить или удалить после записи.
  • Хранение и Retention Policy: Kafka хранит сообщения в топиках определенное время (например, 7 дней) или до достижения определенного размера, после чего старые данные удаляются.

Пример создания топика в Go (с использованием confluent-kafka-go):

package main

import (
    "context"
    "log"
    "time"

    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
    admin, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        log.Fatalf("Не удалось создать AdminClient: %v", err)
    }
    defer admin.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // Создаем топик с 3 партициями и фактором репликации 1
    // Фактор репликации — сколько брокеров будут хранить копию каждой партиции
    _, err = admin.CreateTopics(ctx, []kafka.TopicSpecification{{
        Topic:             "user-events",
        NumPartitions:     3,
        ReplicationFactor: 1,
    }})

    if err != nil {
        log.Fatalf("Не удалось создать топик: %v", err)
    }
    log.Println("Топик 'user-events' успешно создан")
}

Ответ 18+ 🔞

Ну, слушай, смотри, вот есть у тебя Кафка, да? И в ней, блядь, самое главное — это топик. Представь себе, что это такая папка, куда все твои данные летят, как дерьмо в сортире. Только сортир этот — упорядоченный и его не смывает, пока ты сам не нажмёшь на кнопку.

Ну, и что это за зверь такой, топик?

  • Имя. У него есть имя, как у кота. "Васька", "Мурзик", "user-events". Без имени — это просто какая-то хуйня, а не топик.
  • Партиции (Partitioning). Вот тут самое интересное, ёпта! Топик — он не цельный кусок. Он, как пицца, режется на куски — партиции. Зачем? А чтобы не один консьюмер, этот обжора, всё жрал, а несколько могли одновременно в разные куски впиваться. Порядок сообщений гарантирован только внутри одного куска пиццы. Взял кусок с ананасами — там ананасы по порядку лежат. А что в соседнем куске с грибами творится — тебя не ебёт.
  • Неизменяемость (Immutability). Записал сообщение в партицию — всё, пиши пропало. Не выковыришь, не перепишешь. Это как татуха на жопе — навсегда. Только добавлять новые можно.
  • Хранение (Retention Policy). Кафка — не помойка вечная. Она хранит твои данные, пока не надоест. Скажешь "7 дней" — через неделю старые сообщения нахуй полетят. Или скажешь "храни, пока 1 ГБ не наберётся" — как переполнится, самые старые в утиль.

Вот, смотри, как эту хуйню на Go создать:

package main

import (
    "context"
    "log"
    "time"

    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {
    admin, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        log.Fatalf("Не удалось создать AdminClient: %v", err)
    }
    defer admin.Close()

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // Создаем топик с 3 партициями и фактором репликации 1
    // Фактор репликации — сколько брокеров будут хранить копию каждой партиции
    _, err = admin.CreateTopics(ctx, []kafka.TopicSpecification{{
        Topic:             "user-events",
        NumPartitions:     3,
        ReplicationFactor: 1,
    }})

    if err != nil {
        log.Fatalf("Не удалось создать топик: %v", err)
    }
    log.Println("Топик 'user-events' успешно создан")
}

Видишь? Создаём топик user-events. Режем его на три партиции (NumPartitions: 3). И говорим, чтобы каждая партиция жила в одном экземпляре (ReplicationFactor: 1). Если брокер сдохнет — ну, ёбта, прощай, данные. Для надёжности ставь больше, но это уже другая история, про которую я тебе, может, когда-нибудь расскажу, если водки будет.