Расскажи про опыт работы с Apache Kafka

«Расскажи про опыт работы с Apache Kafka» — вопрос из категории Брокеры сообщений, который задают на 25% собеседований C# Разработчик. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Использовал Apache Kafka в качестве основного брокера сообщений для построения отказоустойчивых, масштабируемых event-driven систем и обработки потоковых данных.

Основные сценарии применения:

  • Интеграция микросервисов: Асинхронный обмен событиями (например, OrderCreated, PaymentProcessed) между сервисами для слабой связанности.
  • Сбор и обработка логов/метрик: Агрегация данных с множества источников в реальном времени для последующего анализа.
  • Стриминг данных: Построение пайплайнов для обработки и трансформации потоков данных (например, обогащение пользовательских действий).

Техническая реализация (на .NET): Работал с клиентской библиотекой Confluent.Kafka. Ключевые настройки, которые важно учитывать:

  • Гарантии доставки: Настройка acks=all для обеспечения записи во все реплики партиции перед подтверждением продюсеру (уровень consistency).
  • Идемпотентность и транзакции: Использование для исключения дублей и атомарной публикации в несколько топиков.
  • Сериализация: Использование Avro со Schema Registry для контроля совместимости схем данных.

Пример продюсера с обработкой ошибок и идемпотентностью:

var producerConfig = new ProducerConfig
{
    BootstrapServers = "kafka-broker:9092",
    Acks = Acks.All, // Максимальная надежность
    EnableIdempotence = true, // Исключает дублирование
    MessageSendMaxRetries = 3,
    RetryBackoffMs = 1000
};

using var producer = new ProducerBuilder<Null, OrderEvent>(producerConfig)
    .SetValueSerializer(new AvroSerializer<OrderEvent>(schemaRegistryClient)) // Avro сериализатор
    .Build();

try
{
    var message = new Message<Null, OrderEvent> { Value = orderEvent };
    var deliveryReport = await producer.ProduceAsync("orders", message);

    Console.WriteLine($"Delivered to: {deliveryReport.TopicPartitionOffset}");
}
catch (ProduceException<Null, OrderEvent> ex)
{
    Console.WriteLine($"Delivery failed: {ex.Error.Reason}");
    // Логика повторной попытки или перемещения в dead-letter очередь
}

Пример консьюмера (Consumer Group):

var consumerConfig = new ConsumerConfig
{
    BootstrapServers = "kafka-broker:9092",
    GroupId = "order-processor-group", // Группа для распределения партиций
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = false // Ручное подтверждение для at-least-once семантики
};

using var consumer = new ConsumerBuilder<Ignore, OrderEvent>(consumerConfig)
    .SetValueDeserializer(new AvroDeserializer<OrderEvent>(schemaRegistryClient).AsSyncOverAsync())
    .Build();

consumer.Subscribe("orders");

while (true)
{
    var consumeResult = consumer.Consume(ct);
    try
    {
        ProcessOrder(consumeResult.Message.Value); // Обработка сообщения
        consumer.Commit(consumeResult); // Подтверждение успешной обработки
    }
    catch (Exception ex)
    {
        // Логика для poison messages (например, отправка в топик-"свалку")
        _logger.LogError(ex, "Failed to process order from Kafka.");
    }
}

Администрирование и мониторинг: Имею опыт базовой настройки кластера (репликация, партиционирование), тюнинга производительности (размер батча, компрессия) и настройки мониторинга метрик (через JMX в Prometheus + Grafana) для отслеживания лагов консьюмеров и задержек.