Ответ
Подключение к Kafka осуществляется через клиентские библиотеки. Я чаще всего работаю с Java (Spring Kafka) и Python (confluent-kafka). Вот базовые шаги.
1. Подключение Consumer (на Python с confluent-kafka):
from confluent_kafka import Consumer, KafkaError
config = {
'bootstrap.servers': 'kafka-broker-1:9092,kafka-broker-2:9092', # Список брокеров
'group.id': 'my-service-consumer-group', # Группа для управления offset'ами
'auto.offset.reset': 'earliest', # Чтение с начала, если нет сохраненного offset
'enable.auto.commit': False # Ручное управление коммитами для гарантии обработки
}
consumer = Consumer(config)
consumer.subscribe(['my-input-topic'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f"Consumer error: {msg.error()}")
break
# Обработка сообщения
print(f"Received: {msg.value().decode('utf-8')} from partition {msg.partition()}")
# После успешной обработки коммитим offset вручную
consumer.commit(message=msg, asynchronous=False)
except KeyboardInterrupt:
pass
finally:
consumer.close()
2. Подключение Producer (ключевые настройки):
from confluent_kafka import Producer
producer_config = {
'bootstrap.servers': 'kafka-broker:9092',
'acks': 'all', # Гарантия записи во все реплики (максимальная надежность)
'retries': 5, # Количество попыток повторной отправки при ошибках
'compression.type': 'snappy' # Сжатие для экономии трафика
}
producer = Producer(producer_config)
Критически важные моменты:
- Безопасность: В продакшене всегда использую
security.protocolс SASL/SSL. - Надежность: Для продюсера
acks='all', для консьюмера — ручное управление offset'ами. - Производительность: Настройки
batch.sizeиlinger.msдля продюсера,fetch.min.bytesдля консьюмера.