Ответ
Да, Kafka позволяет консьюмерам читать сообщения не только в реальном времени, но и с любого доступного момента в прошлом. Это одна из ключевых особенностей брокера, основанная на хранении сообщений в логе. Основные подходы:
-
Чтение с самого начала топика
Новый консьюмер (с новой
group.id
) может начать чтение с самого первого сообщения в топике, установив параметрauto_offset_reset='earliest'
.from kafka import KafkaConsumer # Этот консьюмер начнет читать с самого старого сообщения consumer = KafkaConsumer( 'my-topic', group_id='new-reporting-group', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest' # Ключевой параметр )
-
Поиск по временной метке (Timestamp)
Это наиболее точный способ начать чтение с конкретного времени. Метод
offsets_for_times()
находит смещения (offsets) для каждого раздела, соответствующие указанной временной метке.import time from kafka import KafkaConsumer, TopicPartition consumer = KafkaConsumer('my-topic', bootstrap_servers=['localhost:9092']) # Устанавливаем время: 1 час назад (в миллисекундах) one_hour_ago_ms = int((time.time() - 3600) * 1000) # Находим смещения для каждого раздела топика partitions = consumer.partitions_for_topic('my-topic') topic_partitions = [TopicPartition('my-topic', p) for p in partitions] end_offsets = consumer.end_offsets(topic_partitions) # Получаем offset для каждого раздела на заданное время offsets = consumer.offsets_for_times({tp: one_hour_ago_ms for tp in topic_partitions}) # Перемещаем указатель для каждого раздела for tp, offset_and_ts in offsets.items(): if offset_and_ts: consumer.seek(tp, offset_and_ts.offset) # Начинаем чтение for msg in consumer: print(f"Сообщение: {msg.value.decode('utf-8')}")
-
Ручное перемещение смещения (Offset)
Если вы знаете точный
offset
, с которого нужно начать, можно использовать методseek()
.tp = TopicPartition('my-topic', 0) # Раздел 0 consumer.seek(tp, 100) # Начать чтение со 100-го сообщения в разделе 0
Важное ограничение: возможность чтения старых сообщений ограничена политикой хранения данных на брокере (retention.ms
или retention.bytes
). Сообщения, которые старше установленного лимита, будут удалены и станут недоступны для чтения.