Как подключиться к топикам Apache Kafka?

«Как подключиться к топикам Apache Kafka?» — вопрос из категории Потоковая обработка и Kafka, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Подключение к Kafka осуществляется через клиентские библиотеки. Я чаще всего работаю с Java (Spring Kafka) и Python (confluent-kafka). Вот базовые шаги.

1. Подключение Consumer (на Python с confluent-kafka):

from confluent_kafka import Consumer, KafkaError

config = {
    'bootstrap.servers': 'kafka-broker-1:9092,kafka-broker-2:9092',  # Список брокеров
    'group.id': 'my-service-consumer-group',  # Группа для управления offset'ами
    'auto.offset.reset': 'earliest',  # Чтение с начала, если нет сохраненного offset
    'enable.auto.commit': False  # Ручное управление коммитами для гарантии обработки
}

consumer = Consumer(config)
consumer.subscribe(['my-input-topic'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            else:
                print(f"Consumer error: {msg.error()}")
                break
        # Обработка сообщения
        print(f"Received: {msg.value().decode('utf-8')} from partition {msg.partition()}")
        # После успешной обработки коммитим offset вручную
        consumer.commit(message=msg, asynchronous=False)
except KeyboardInterrupt:
    pass
finally:
    consumer.close()

2. Подключение Producer (ключевые настройки):

from confluent_kafka import Producer

producer_config = {
    'bootstrap.servers': 'kafka-broker:9092',
    'acks': 'all',  # Гарантия записи во все реплики (максимальная надежность)
    'retries': 5,   # Количество попыток повторной отправки при ошибках
    'compression.type': 'snappy'  # Сжатие для экономии трафика
}
producer = Producer(producer_config)

Критически важные моменты:

  • Безопасность: В продакшене всегда использую security.protocol с SASL/SSL.
  • Надежность: Для продюсера acks='all', для консьюмера — ручное управление offset'ами.
  • Производительность: Настройки batch.size и linger.ms для продюсера, fetch.min.bytes для консьюмера.