Ответ
Да, Kafka позволяет консьюмерам читать сообщения не только в реальном времени, но и с любого доступного момента в прошлом. Это одна из ключевых особенностей брокера, основанная на хранении сообщений в логе. Основные подходы:
-
Чтение с самого начала топика
Новый консьюмер (с новой
group.id) может начать чтение с самого первого сообщения в топике, установив параметрauto_offset_reset='earliest'.from kafka import KafkaConsumer # Этот консьюмер начнет читать с самого старого сообщения consumer = KafkaConsumer( 'my-topic', group_id='new-reporting-group', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest' # Ключевой параметр ) -
Поиск по временной метке (Timestamp)
Это наиболее точный способ начать чтение с конкретного времени. Метод
offsets_for_times()находит смещения (offsets) для каждого раздела, соответствующие указанной временной метке.import time from kafka import KafkaConsumer, TopicPartition consumer = KafkaConsumer('my-topic', bootstrap_servers=['localhost:9092']) # Устанавливаем время: 1 час назад (в миллисекундах) one_hour_ago_ms = int((time.time() - 3600) * 1000) # Находим смещения для каждого раздела топика partitions = consumer.partitions_for_topic('my-topic') topic_partitions = [TopicPartition('my-topic', p) for p in partitions] end_offsets = consumer.end_offsets(topic_partitions) # Получаем offset для каждого раздела на заданное время offsets = consumer.offsets_for_times({tp: one_hour_ago_ms for tp in topic_partitions}) # Перемещаем указатель для каждого раздела for tp, offset_and_ts in offsets.items(): if offset_and_ts: consumer.seek(tp, offset_and_ts.offset) # Начинаем чтение for msg in consumer: print(f"Сообщение: {msg.value.decode('utf-8')}") -
Ручное перемещение смещения (Offset)
Если вы знаете точный
offset, с которого нужно начать, можно использовать методseek().tp = TopicPartition('my-topic', 0) # Раздел 0 consumer.seek(tp, 100) # Начать чтение со 100-го сообщения в разделе 0
Важное ограничение: возможность чтения старых сообщений ограничена политикой хранения данных на брокере (retention.ms или retention.bytes). Сообщения, которые старше установленного лимита, будут удалены и станут недоступны для чтения.
Ответ 18+ 🔞
Да ты послушай, какая же это, блядь, офигенная штука — Кафка! Это ж не просто очередь, это, сука, машина времени для данных! Хочешь в прошлое заглянуть — пожалуйста, на хуй! Хочешь с самого начала историю топика прочитать — без проблем, ёпта!
Вот смотри, как это работает, блядь. Основная фишка в том, что все сообщения хранятся в логе, как в дневнике у девочки-подростка, только без соплей, а с данными. И этот дневник, блядь, не стирается просто так!
Первый способ — для новеньких, которые хотят всё с нуля.
Ты создаёшь нового консьюмера, даёшь ему свежую group.id, чтобы он не примазался к старым, и говоришь: «Читай, сука, с самого древнего, что есть!» Параметр auto_offset_reset='earliest' — это как волшебное слово.
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'my-topic',
group_id='new-reporting-group',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest' # Вот эта хуйня заставляет читать с самого начала!
)
Второй способ — для точных, как швейцарские часы.
Допустим, тебе надо прочитать всё, что было за последний час. Ну, там, откатить какой-нибудь косяк, или отчёт построить. Берёшь временную метку, переводишь в миллисекунды (потому что Кафка, блядь, любит точность) и ищешь смещения под это время.
import time
from kafka import KafkaConsumer, TopicPartition
consumer = KafkaConsumer('my-topic', bootstrap_servers=['localhost:9092'])
# Допустим, час назад. Считаем, блядь, как в школе.
one_hour_ago_ms = int((time.time() - 3600) * 1000)
# Выясняем, какие есть разделы в топике
partitions = consumer.partitions_for_topic('my-topic')
topic_partitions = [TopicPartition('my-topic', p) for p in partitions]
# Спрашиваем у Кафки: «А где, блядь, в каждом разделе сообщение, которое было вот в это время?»
offsets = consumer.offsets_for_times({tp: one_hour_ago_ms for tp in topic_partitions})
# И перематываем плёнку на нужное место!
for tp, offset_and_ts in offsets.items():
if offset_and_ts:
consumer.seek(tp, offset_and_ts.offset)
# Всё, погнали читать историю!
for msg in consumer:
print(f"Сообщение: {msg.value.decode('utf-8')}")
Третий способ — для упоротых перфекционистов.
Если ты знаешь точный номер сообщения (offset), с которого надо начать, то просто тычешь консьюмером в него, как слепого в нужную дверь. Метод seek() — твой друг.
tp = TopicPartition('my-topic', 0) # Берём, например, нулевой раздел
consumer.seek(tp, 100) # И начинаем читать со сотого сообщения, ёбта!
Но, слушай сюда, ёпта, есть один важный нюанс, про который все забывают, а потом охуевают! Кафка — не вечный архив, блядь! Сообщения хранятся не бесконечно, а столько, сколько сказано в настройках брокера (retention.ms или retention.bytes). Если твоё «прошлое» старше этого срока — считай, его съела моль, нахуй! Оно удалено, и ты его уже не прочитаешь, как ни старайся. Так что имей в виду, хитрая жопа!