Как Apache Kafka собирает и обрабатывает потоки данных?

«Как Apache Kafka собирает и обрабатывает потоки данных?» — вопрос из категории Архитектура, который задают на 24% собеседований PHP Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Apache Kafka — распределенная система потоковой обработки данных, работающая по модели публикация-подписка. Рассмотрим архитектуру и процесс сбора данных.

Архитектурные компоненты:

  1. Producer (Продюсер) — отправляет сообщения в топики
  2. Topic (Топик) — категория/поток сообщений
  3. Partition (Партиция) — параллельная единица топика для масштабирования
  4. Consumer (Консьюмер) — читает сообщения из топиков
  5. Broker (Брокер) — сервер Kafka, хранящий данные
  6. ZooKeeper — координация кластера (в новых версиях заменяется на KRaft)

Процесс сбора данных:

// Пример продюсера на Java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class DataCollectorProducer {
    public static void main(String[] args) {
        // Конфигурация продюсера
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all"); // Гарантия доставки
        props.put("retries", 3); // Повторные попытки

        // Создание продюсера
        Producer<String, String> producer = new KafkaProducer<>(props);

        try {
            // Сбор данных из источника (например, IoT-устройства)
            SensorData sensorData = collectSensorData();

            // Создание сообщения
            String key = sensorData.getDeviceId(); // Ключ определяет партицию
            String value = sensorData.toJson();

            // Отправка в топик "sensor-readings"
            ProducerRecord<String, String> record = 
                new ProducerRecord<>("sensor-readings", key, value);

            // Асинхронная отправка с callback
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.printf(
                            "Сообщение отправлено в топик=%s, партиция=%d, offset=%d%n",
                            metadata.topic(), metadata.partition(), metadata.offset()
                        );
                    } else {
                        System.err.println("Ошибка отправки: " + exception.getMessage());
                    }
                }
            });

        } finally {
            producer.flush(); // Отправка оставшихся сообщений
            producer.close(); // Закрытие соединения
        }
    }
}

Распределение по партициям:

// Ключ сообщения определяет партицию через хеширование
// Сообщения с одинаковым ключом попадают в одну партицию

// Пример 1: С ключом (гарантированный порядок для deviceId)
ProducerRecord<String, String> record1 = 
    new ProducerRecord<>("logs", "server-123", "Error: Disk full");
// Все сообщения от server-123 будут в одной партиции

// Пример 2: Без ключа (round-robin распределение)
ProducerRecord<String, String> record2 = 
    new ProducerRecord<>("metrics", null, "CPU: 45%");
// Сообщения распределяются по партициям циклически

Обработка на стороне брокера:

  1. Прием сообщений: Брокер получает сообщения от продюсеров
  2. Распределение: Сообщения распределяются по партициям топика
  3. Сохранение: Данные записываются в сегменты на диск
  4. Репликация: Каждая партиция реплицируется на несколько брокеров
  5. Коммит лога: Сообщения подтверждаются согласно настройкам acks

Конфигурация топика:

# Создание топика с настройками
kafka-topics.sh --create 
  --topic user-actions 
  --partitions 6            # 6 партиций для параллельной обработки
  --replication-factor 3    # 3 реплики для отказоустойчивости
  --config retention.ms=604800000   # Хранение 7 дней
  --config cleanup.policy=delete 
  --bootstrap-server localhost:9092

Потоковая обработка с Kafka Streams:

// Обработка данных в реальном времени
StreamsBuilder builder = new StreamsBuilder();

// Чтение из входного топика
KStream<String, String> source = builder.stream("raw-sensor-data");

// Преобразование данных
KStream<String, Double> processed = source
    .filter((key, value) -> value != null)
    .mapValues(value -> parseTemperature(value))
    .filter((key, temp) -> temp > -50 && temp < 100) // Валидация
    .map((key, temp) -> KeyValue.pair(key, celsiusToFahrenheit(temp)));

// Агрегация (оконные операции)
KTable<Windowed<String>, Long> hourlyCounts = processed
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofHours(1)))
    .count();

// Запись в выходной топик
processed.to("processed-sensor-data");
hourlyCounts.toStream().to("sensor-stats-hourly");

// Запуск приложения обработки
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();

Гарантии доставки:

  • At least once: Сообщения не теряются, но могут дублироваться
  • At most once: Сообщения могут теряться, но не дублируются
  • Exactly once: Идеальный вариант (требует транзакций)

Мониторинг и управление:

# Просмотр смещений (offsets)
kafka-consumer-groups.sh --describe 
  --group data-processors 
  --bootstrap-server localhost:9092

# Просмотр сообщений в топике
kafka-console-consumer.sh 
  --topic user-actions 
  --from-beginning 
  --bootstrap-server localhost:9092

# Проверка состояния кластера
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

Использование в микросервисной архитектуре:

# Пример конфигурации для сбора логов
producer:
  topic: "application-logs"
  partitions: 12
  replication: 3
  compression: snappy  # Сжатие для экономии трафика
  batch-size: 16384    # Размер батча для оптимизации
  linger-ms: 5         # Задержка перед отправкой батча

consumer:
  group-id: "log-aggregator"
  auto-offset-reset: "latest"
  enable-auto-commit: false  # Ручное управление offset
  max-poll-records: 500      # Максимум сообщений за poll