Ответ
Apache Kafka — это распределённая платформа для потоковой обработки событий (event streaming platform), предназначенная для создания высокопроизводительных конвейеров данных, потоковых приложений и интеграции микросервисов в реальном времени.
Она работает как отказоустойчивый и масштабируемый брокер сообщений, но с возможностью долговременного хранения данных.
Ключевые компоненты
- Topic (Топик): Именованный поток событий. Например,
user_registrations
. - Partition (Партиция): Топик разделяется на партиции для параллельной обработки и масштабирования. Порядок сообщений гарантируется только в пределах одной партиции.
- Broker (Брокер): Сервер Kafka, который хранит данные и обслуживает запросы клиентов.
- Producer (Поставщик): Приложение, которое публикует (отправляет) события в топики.
- Consumer (Потребитель): Приложение, которое подписывается на топики и обрабатывает события.
Принцип работы
- Producer отправляет событие в конкретный топик.
- Kafka сохраняет событие в партиции этого топика и реплицирует его на другие брокеры для отказоустойчивости.
- Consumer читает события из топика, отслеживая свою позицию (offset) в каждой партиции.
Пример на Python (библиотека confluent-kafka-python
)
from confluent_kafka import Producer, Consumer, KafkaError
# --- Producer: отправка сообщения ---
def delivery_report(err, msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}]')
producer_conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(producer_conf)
producer.produce('my_topic', key='user1', value='User registered', callback=delivery_report)
producer.flush() # Дождаться отправки всех сообщений
# --- Consumer: чтение сообщения ---
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'my_group',
'auto.offset.reset': 'earliest' # Начать чтение с самого начала
}
consumer = Consumer(consumer_conf)
consumer.subscribe(['my_topic'])
msg = consumer.poll(timeout=1.0) # Ожидать сообщение 1 секунду
if msg is not None and msg.error() is None:
print(f"Received message: key={msg.key().decode('utf-8')}, value={msg.value().decode('utf-8')}")
consumer.close()
Основные сценарии использования
- Микросервисная архитектура: Асинхронное общение между сервисами.
- Сбор логов и метрик: Агрегация данных из множества источников.
- Event Sourcing: Хранение всех изменений состояния системы в виде последовательности событий.
- Аналитика в реальном времени: Обработка потоков данных для немедленного анализа.