Что такое Apache Kafka и для чего он используется

Ответ

Apache Kafka — это распределённая платформа для потоковой обработки событий (event streaming platform), предназначенная для создания высокопроизводительных конвейеров данных, потоковых приложений и интеграции микросервисов в реальном времени.

Она работает как отказоустойчивый и масштабируемый брокер сообщений, но с возможностью долговременного хранения данных.

Ключевые компоненты

  • Topic (Топик): Именованный поток событий. Например, user_registrations.
  • Partition (Партиция): Топик разделяется на партиции для параллельной обработки и масштабирования. Порядок сообщений гарантируется только в пределах одной партиции.
  • Broker (Брокер): Сервер Kafka, который хранит данные и обслуживает запросы клиентов.
  • Producer (Поставщик): Приложение, которое публикует (отправляет) события в топики.
  • Consumer (Потребитель): Приложение, которое подписывается на топики и обрабатывает события.

Принцип работы

  1. Producer отправляет событие в конкретный топик.
  2. Kafka сохраняет событие в партиции этого топика и реплицирует его на другие брокеры для отказоустойчивости.
  3. Consumer читает события из топика, отслеживая свою позицию (offset) в каждой партиции.

Пример на Python (библиотека confluent-kafka-python)

from confluent_kafka import Producer, Consumer, KafkaError

# --- Producer: отправка сообщения ---
def delivery_report(err, msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()} [{msg.partition()}]')

producer_conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(producer_conf)

producer.produce('my_topic', key='user1', value='User registered', callback=delivery_report)
producer.flush() # Дождаться отправки всех сообщений

# --- Consumer: чтение сообщения ---
consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest' # Начать чтение с самого начала
}
consumer = Consumer(consumer_conf)
consumer.subscribe(['my_topic'])

msg = consumer.poll(timeout=1.0) # Ожидать сообщение 1 секунду
if msg is not None and msg.error() is None:
    print(f"Received message: key={msg.key().decode('utf-8')}, value={msg.value().decode('utf-8')}")

consumer.close()

Основные сценарии использования

  • Микросервисная архитектура: Асинхронное общение между сервисами.
  • Сбор логов и метрик: Агрегация данных из множества источников.
  • Event Sourcing: Хранение всех изменений состояния системы в виде последовательности событий.
  • Аналитика в реальном времени: Обработка потоков данных для немедленного анализа.