Как в Kafka прочитать сообщения, начиная с определенного момента в прошлом

Ответ

Да, Kafka позволяет консьюмерам читать сообщения не только в реальном времени, но и с любого доступного момента в прошлом. Это одна из ключевых особенностей брокера, основанная на хранении сообщений в логе. Основные подходы:

  1. Чтение с самого начала топика

    Новый консьюмер (с новой group.id) может начать чтение с самого первого сообщения в топике, установив параметр auto_offset_reset='earliest'.

    from kafka import KafkaConsumer
    
    # Этот консьюмер начнет читать с самого старого сообщения
    consumer = KafkaConsumer(
        'my-topic',
        group_id='new-reporting-group',
        bootstrap_servers=['localhost:9092'],
        auto_offset_reset='earliest' # Ключевой параметр
    )
  2. Поиск по временной метке (Timestamp)

    Это наиболее точный способ начать чтение с конкретного времени. Метод offsets_for_times() находит смещения (offsets) для каждого раздела, соответствующие указанной временной метке.

    import time
    from kafka import KafkaConsumer, TopicPartition
    
    consumer = KafkaConsumer('my-topic', bootstrap_servers=['localhost:9092'])
    
    # Устанавливаем время: 1 час назад (в миллисекундах)
    one_hour_ago_ms = int((time.time() - 3600) * 1000)
    
    # Находим смещения для каждого раздела топика
    partitions = consumer.partitions_for_topic('my-topic')
    topic_partitions = [TopicPartition('my-topic', p) for p in partitions]
    end_offsets = consumer.end_offsets(topic_partitions)
    
    # Получаем offset для каждого раздела на заданное время
    offsets = consumer.offsets_for_times({tp: one_hour_ago_ms for tp in topic_partitions})
    
    # Перемещаем указатель для каждого раздела
    for tp, offset_and_ts in offsets.items():
        if offset_and_ts:
            consumer.seek(tp, offset_and_ts.offset)
    
    # Начинаем чтение
    for msg in consumer:
        print(f"Сообщение: {msg.value.decode('utf-8')}")
  3. Ручное перемещение смещения (Offset)

    Если вы знаете точный offset, с которого нужно начать, можно использовать метод seek().

    tp = TopicPartition('my-topic', 0) # Раздел 0
    consumer.seek(tp, 100) # Начать чтение со 100-го сообщения в разделе 0

Важное ограничение: возможность чтения старых сообщений ограничена политикой хранения данных на брокере (retention.ms или retention.bytes). Сообщения, которые старше установленного лимита, будут удалены и станут недоступны для чтения.

Ответ 18+ 🔞

Да ты послушай, какая же это, блядь, офигенная штука — Кафка! Это ж не просто очередь, это, сука, машина времени для данных! Хочешь в прошлое заглянуть — пожалуйста, на хуй! Хочешь с самого начала историю топика прочитать — без проблем, ёпта!

Вот смотри, как это работает, блядь. Основная фишка в том, что все сообщения хранятся в логе, как в дневнике у девочки-подростка, только без соплей, а с данными. И этот дневник, блядь, не стирается просто так!

Первый способ — для новеньких, которые хотят всё с нуля.
Ты создаёшь нового консьюмера, даёшь ему свежую group.id, чтобы он не примазался к старым, и говоришь: «Читай, сука, с самого древнего, что есть!» Параметр auto_offset_reset='earliest' — это как волшебное слово.

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'my-topic',
    group_id='new-reporting-group',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest' # Вот эта хуйня заставляет читать с самого начала!
)

Второй способ — для точных, как швейцарские часы.
Допустим, тебе надо прочитать всё, что было за последний час. Ну, там, откатить какой-нибудь косяк, или отчёт построить. Берёшь временную метку, переводишь в миллисекунды (потому что Кафка, блядь, любит точность) и ищешь смещения под это время.

import time
from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer('my-topic', bootstrap_servers=['localhost:9092'])

# Допустим, час назад. Считаем, блядь, как в школе.
one_hour_ago_ms = int((time.time() - 3600) * 1000)

# Выясняем, какие есть разделы в топике
partitions = consumer.partitions_for_topic('my-topic')
topic_partitions = [TopicPartition('my-topic', p) for p in partitions]

# Спрашиваем у Кафки: «А где, блядь, в каждом разделе сообщение, которое было вот в это время?»
offsets = consumer.offsets_for_times({tp: one_hour_ago_ms for tp in topic_partitions})

# И перематываем плёнку на нужное место!
for tp, offset_and_ts in offsets.items():
    if offset_and_ts:
        consumer.seek(tp, offset_and_ts.offset)

# Всё, погнали читать историю!
for msg in consumer:
    print(f"Сообщение: {msg.value.decode('utf-8')}")

Третий способ — для упоротых перфекционистов.
Если ты знаешь точный номер сообщения (offset), с которого надо начать, то просто тычешь консьюмером в него, как слепого в нужную дверь. Метод seek() — твой друг.

tp = TopicPartition('my-topic', 0) # Берём, например, нулевой раздел
consumer.seek(tp, 100) # И начинаем читать со сотого сообщения, ёбта!

Но, слушай сюда, ёпта, есть один важный нюанс, про который все забывают, а потом охуевают! Кафка — не вечный архив, блядь! Сообщения хранятся не бесконечно, а столько, сколько сказано в настройках брокера (retention.ms или retention.bytes). Если твоё «прошлое» старше этого срока — считай, его съела моль, нахуй! Оно удалено, и ты его уже не прочитаешь, как ни старайся. Так что имей в виду, хитрая жопа!