Как структурировать и фильтровать смешанные логи в Logstash?

Ответ

Основная задача при работе со смешанными логами в Logstash — это их парсинг и обогащение, чтобы превратить неструктурированный текст в структурированные события (например, JSON). Это достигается с помощью конвейера фильтров (filters).

1. Парсинг с помощью grok

Фильтр grok использует готовые шаблоны (patterns) для извлечения полей из строки лога. Это первый и самый важный шаг для придания структуры.

filter {
  # Применяем grok-шаблон для логов Nginx
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
  }
}

2. Обработка многострочных логов (стектрейсов)

Для логов, занимающих несколько строк (например, Java stack traces), используется кодек multiline. Он объединяет строки, относящиеся к одному событию, в одно сообщение до парсинга.

input {
  file {
    path => "/var/log/myapp.log"
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}" # Новое событие начинается с временной метки
      negate => true
      what => "previous"
    }
  }
}

3. Обогащение и фильтрация с помощью условной логики

Используйте условные операторы (if/else) для применения разных правил к разным типам логов. Часто это используется для добавления тегов, которые помогают в дальнейшей фильтрации и маршрутизации.

filter {
  if [message] =~ /error|exception/i {
    mutate {
      add_tag => ["error_log"]
    }
  }
  # Удаляем отладочные сообщения, чтобы не хранить их
  if [loglevel] == "DEBUG" {
    drop {}
  }
}

4. Маршрутизация в output на основе тегов

В блоке output можно использовать теги, чтобы отправлять разные логи в разные места. Например, ошибки можно отправлять в отдельный индекс Elasticsearch для быстрого поиска и алертинга.

output {
  if "error_log" in [tags] {
    elasticsearch {
      hosts => ["es-node-1:9200"]
      index => "logs-errors-%{+YYYY.MM.dd}"
    }
  } else {
    elasticsearch {
      hosts => ["es-node-1:9200"]
      index => "logs-generic-%{+YYYY.MM.dd}"
    }
  }
}

Ответ 18+ 🔞

А, ну это же про то, как из этой каши в логах сделать конфетку, чтобы потом можно было по ней глазами скользить, а не ебаться с расшифровкой, как с китайской грамотой!

Так вот, смотри. Главная твоя задача — это взять эту бесформенную пиздобратию, которая сыпется из всех щелей (nginx, приложение, ядро), и превратить её в аккуратненький JSON, с полями, датами и прочей хуетой, которую Elasticsearch сожрёт с удовольствием. Всё это делается через фильтры в Logstash, там где filter.

1. Парсинг через grok — наш главный отмычек

Это как раз тот мужик, который умеет выковыривать из строки куски по шаблону. Типа, вот тебе строка лога, а ты ему говоришь: «Видишь вот тут IP-адрес? Вытащи его в поле clientip. А вот это дата — в timestamp». И он вытаскивает, ёпта!

filter {
  # Допустим, у тебя логи от Nginx лезут
  if [type] == "nginx-access" {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" } # Вот этот шаблон — готовый, как булка, для Apache/Nginx логов. Удобно, блядь!
    }
  }
}

После этого в событии появятся поля: clientip, timestamp, request, response и куча других. Вместо одной строки message — целый набор ключей-значений. Красота!

2. А если логи на несколько строк размазаны? Стектрейсы, например

Ну, классика: упало приложение, и на тебе — трёхэтажный стектрейс, где каждая строка с новой линии. Если это так отправить в Logstash, он каждую строчку посчитает отдельным событием, и будет пиздец, а не логи. Надо склеить.

Для этого есть кодек multiline прямо на входе (input). Он как бы говорит: «Эй, если строка НЕ начинается с временной метки (шаблон pattern), то прилепи её к предыдущей строке (what => "previous")».

input {
  file {
    path => "/var/log/myapp.log"
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}" # Новое событие — только то, что начинается с даты-времени
      negate => true
      what => "previous"
    }
  }
}

Вот так весь стектрейс станет одним большим message, и потом его уже можно парсить.

3. Обогащаем и фильтруем — добавляем мозгов

Допустим, ты распарсил логи. Теперь можно пошаманить. Видишь в сообщении слова «error» или «exception»? Вешай тег error_log — потом пригодится.

filter {
  if [message] =~ /error|exception/i {
    mutate {
      add_tag => ["error_log"] # Бац, и у события теперь есть метка
    }
  }
  # А вот отладочные логи (DEBUG) нам нахуй не сдались, пусть летят в /dev/null
  if [loglevel] == "DEBUG" {
    drop {} # Просто выкидываем событие, и всё
  }
}

Условная логика (if/else) — это твой лучший друг. С её помощью можно творить вообще что угодно.

4. И наконец — раскидываем по разным углам (output)

Вот у нас есть события, некоторые с тегом error_log. Хранить их в одной куче с остальным мусором? Да ну нахуй! Отправляем ошибки в отдельный индекс в Elasticsearch, чтобы потом быстро находить и бить тревогу.

output {
  if "error_log" in [tags] {
    elasticsearch {
      hosts => ["es-node-1:9200"]
      index => "logs-errors-%{+YYYY.MM.dd}" # Индекс для ошибок
    }
  } else {
    elasticsearch {
      hosts => ["es-node-1:9200"]
      index => "logs-generic-%{+YYYY.MM.dd}" # Индекс для всего остального
    }
  }
}

Вот и весь базовый принцип, ёпта. Разобрал на куски → прицепил бирки → разложил по полочкам. Ничего сложного, главное — начать и не бояться экспериментировать с grok-шаблонами, а то с ними иногда волосы дыбом встают, честное слово!