Ответ
Apache Kafka — распределенная система потоковой обработки данных, работающая по модели публикация-подписка. Рассмотрим архитектуру и процесс сбора данных.
Архитектурные компоненты:
- Producer (Продюсер) — отправляет сообщения в топики
- Topic (Топик) — категория/поток сообщений
- Partition (Партиция) — параллельная единица топика для масштабирования
- Consumer (Консьюмер) — читает сообщения из топиков
- Broker (Брокер) — сервер Kafka, хранящий данные
- ZooKeeper — координация кластера (в новых версиях заменяется на KRaft)
Процесс сбора данных:
// Пример продюсера на Java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class DataCollectorProducer {
public static void main(String[] args) {
// Конфигурация продюсера
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // Гарантия доставки
props.put("retries", 3); // Повторные попытки
// Создание продюсера
Producer<String, String> producer = new KafkaProducer<>(props);
try {
// Сбор данных из источника (например, IoT-устройства)
SensorData sensorData = collectSensorData();
// Создание сообщения
String key = sensorData.getDeviceId(); // Ключ определяет партицию
String value = sensorData.toJson();
// Отправка в топик "sensor-readings"
ProducerRecord<String, String> record =
new ProducerRecord<>("sensor-readings", key, value);
// Асинхронная отправка с callback
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.printf(
"Сообщение отправлено в топик=%s, партиция=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset()
);
} else {
System.err.println("Ошибка отправки: " + exception.getMessage());
}
}
});
} finally {
producer.flush(); // Отправка оставшихся сообщений
producer.close(); // Закрытие соединения
}
}
}
Распределение по партициям:
// Ключ сообщения определяет партицию через хеширование
// Сообщения с одинаковым ключом попадают в одну партицию
// Пример 1: С ключом (гарантированный порядок для deviceId)
ProducerRecord<String, String> record1 =
new ProducerRecord<>("logs", "server-123", "Error: Disk full");
// Все сообщения от server-123 будут в одной партиции
// Пример 2: Без ключа (round-robin распределение)
ProducerRecord<String, String> record2 =
new ProducerRecord<>("metrics", null, "CPU: 45%");
// Сообщения распределяются по партициям циклически
Обработка на стороне брокера:
- Прием сообщений: Брокер получает сообщения от продюсеров
- Распределение: Сообщения распределяются по партициям топика
- Сохранение: Данные записываются в сегменты на диск
- Репликация: Каждая партиция реплицируется на несколько брокеров
- Коммит лога: Сообщения подтверждаются согласно настройкам
acks
Конфигурация топика:
# Создание топика с настройками
kafka-topics.sh --create
--topic user-actions
--partitions 6 # 6 партиций для параллельной обработки
--replication-factor 3 # 3 реплики для отказоустойчивости
--config retention.ms=604800000 # Хранение 7 дней
--config cleanup.policy=delete
--bootstrap-server localhost:9092
Потоковая обработка с Kafka Streams:
// Обработка данных в реальном времени
StreamsBuilder builder = new StreamsBuilder();
// Чтение из входного топика
KStream<String, String> source = builder.stream("raw-sensor-data");
// Преобразование данных
KStream<String, Double> processed = source
.filter((key, value) -> value != null)
.mapValues(value -> parseTemperature(value))
.filter((key, temp) -> temp > -50 && temp < 100) // Валидация
.map((key, temp) -> KeyValue.pair(key, celsiusToFahrenheit(temp)));
// Агрегация (оконные операции)
KTable<Windowed<String>, Long> hourlyCounts = processed
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofHours(1)))
.count();
// Запись в выходной топик
processed.to("processed-sensor-data");
hourlyCounts.toStream().to("sensor-stats-hourly");
// Запуск приложения обработки
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Гарантии доставки:
- At least once: Сообщения не теряются, но могут дублироваться
- At most once: Сообщения могут теряться, но не дублируются
- Exactly once: Идеальный вариант (требует транзакций)
Мониторинг и управление:
# Просмотр смещений (offsets)
kafka-consumer-groups.sh --describe
--group data-processors
--bootstrap-server localhost:9092
# Просмотр сообщений в топике
kafka-console-consumer.sh
--topic user-actions
--from-beginning
--bootstrap-server localhost:9092
# Проверка состояния кластера
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
Использование в микросервисной архитектуре:
# Пример конфигурации для сбора логов
producer:
topic: "application-logs"
partitions: 12
replication: 3
compression: snappy # Сжатие для экономии трафика
batch-size: 16384 # Размер батча для оптимизации
linger-ms: 5 # Задержка перед отправкой батча
consumer:
group-id: "log-aggregator"
auto-offset-reset: "latest"
enable-auto-commit: false # Ручное управление offset
max-poll-records: 500 # Максимум сообщений за poll