Как посмотреть план выполнения запроса (execution plan) в Apache Spark?

Ответ

В Apache Spark план выполнения запроса можно и нужно анализировать для понимания логики работы и оптимизации. Основной метод — .explain().

1. Основные режимы .explain():

  • .explain() или .explain(mode="simple"): Выводит только физический план (Physical Plan).
  • .explain(mode="extended") или .explain(True): Выводит логический (Logical Plan), оптимизированный логический план (Optimized Logical Plan) и физический план (Physical Plan).
  • .explain(mode="formatted"): Выводит план в виде отдельной таблицы с разделами, что улучшает читаемость.
  • .explain(mode="cost"): Доступен, если включена поддержка CBO (Cost-Based Optimizer). Показывает план с оценкой стоимости операций.

Пример:

val df = spark.read.parquet("data.parquet")
val resultDF = df.filter($"age" > 25).groupBy("department").agg(avg("salary"))

// Самый полезный вариант для анализа
resultDF.explain("extended")

2. Ключевые элементы плана и на что обращать внимание:

  • Scan / FileScan: Как и откуда читаются данные. Проверяйте, используются ли партиционирование и фильтрация на уровне чтения (PushedFilters).
  • Filter: Где применяется условие WHERE. Желательно, чтобы фильтрация происходила как можно раньше.
  • Project: Выбор столбцов.
  • HashAggregate или ObjectHashAggregate: Операции агрегации (GROUP BY, агрегатные функции).
  • Exchange: Ключевой момент! Это операция shuffle (перемешивание данных между узлами кластера). Shuffle — самая дорогая операция. Её наличие указывает на необходимость перераспределения данных (например, перед groupBy по ключу, отличному от партиционирования).
    • Exchange hashpartitioning(...) — shuffle для агрегаций или join.
    • Exchange singlepartitioning — сбор всех данных в один узел (опасно для больших данных!).
  • Sort: Сортировка. Может быть очень дорогой.
  • BroadcastHashJoin или SortMergeJoin: Тип соединения (join). BroadcastHashJoin (broadcast) обычно эффективнее, если одна из таблиц мала.

3. Практические шаги по оптимизации на основе плана:

  1. Ищите Exchange (shuffle): Можно ли его избежать? Например, перепартиционировав данные заранее или используя broadcast join для маленькой таблицы.
  2. Проверяйте PushedFilters в Scan: Убедитесь, что предикаты фильтрации "проталкиваются" до чтения данных (особенно для форматов Parquet/ORC).
  3. Обращайте внимание на размеры данных на каждом этапе: План может показывать примерное число строк. Резкий рост числа строк после Join может указывать на проблему (декартово произведение).
  4. Сравнивайте планы: Запустите .explain() для разных вариантов записи одного запроса (например, с перестановкой filter и join), чтобы увидеть, как меняется план.

Ответ 18+ 🔞

А, ну вот, опять про эти планы выполнения в Spark. Слушай, тут всё просто, как три копейки, но если не вникать — можно голову сломать, честное слово. Как будто пытаешься понять, почему холодильник гудит, а морозилка не морозит. В общем, смотри.

1. Как эту хрень вообще посмотреть? Метод .explain()

Есть у тебя этот датафрейм, красавец, и ты хочешь узнать, что он там внутри себя думает, прежде чем начнёт ебашить. Так вот, ты ему — .explain(). Но есть нюансы, ёпта.

  • .explain() или .explain(mode="simple"): Это как быстрый взгляд под капот. Покажет только финальный, физический план — что двигатели будут делать. Без подробностей, кто куда и зачем.
  • .explain(mode="extended") или .explain(True): Вот это уже серьёзно. Тут тебе вывалит всю подноготную: как ты хотел (логический план), как Spark это переделал (оптимизированный план), и что будет (физический план). Самый полезный режим, когда пытаешься понять, почему запрос тормозит как вязкая жопа.
  • .explain(mode="formatted"): Для эстетов. Выводит всё красивыми табличками, разделяет на блоки. Глаза не так болят, когда смотришь на овердохуища строк.
  • .explain(mode="cost"): Это если ты включил у себя CBO (Cost-Based Optimizer), эту хитрожопую штуку, которая пытается считать стоимость операций. Не всегда работает, но если работает — может быть полезно.

Вот тебе живой пример, чтобы не быть голословным:

val df = spark.read.parquet("data.parquet")
val resultDF = df.filter($"age" > 25).groupBy("department").agg(avg("salary"))

// Делай именно так, если хочешь докопаться до сути
resultDF.explain("extended")

2. На что в этом плане смотреть, чтобы не охуеть?

Там куча терминов, но главных — несколько. Ищи их, как иголку в стоге сена.

  • Scan / FileScan: Откуда данные сосёт. Смотри, есть ли там PushedFilters. Если есть — красава, фильтры "протолкнулись" до чтения файла (особенно для Parquet/ORC), и он не будет читать всё подряд, а только нужное. Если нет — готовься, будет жрать всё, как не в себя.
  • Filter: Это твоё условие WHERE. В идеале, чтобы оно висело как можно раньше в плане и отсекало хлам сразу, а не таскало его через весь конвейер.
  • Project: Просто выбор столбцов. Обычно не проблема.
  • HashAggregate: А вот это уже группировки и агрегатные функции (типа avg, sum). Может быть дороговато.
  • Exchange: ВОТ ОН, КОРОЛЬ ПРОБЛЕМ! Запомни это слово. Это операция shuffle — когда данные начинают метаться между всеми узлами кластера туда-сюда. Самая медленная и дорогая хуйня в Spark. Если видишь Exchange hashpartitioning — значит, будет перемешивание для groupBy или join. А если видишь Exchange singlepartitioning — это вообще пиздец, все данные поедут в одну точку. Беги от этого как чёрт от ладана.
  • Sort: Сортировка. Может быть такой же прожорливой, как shuffle.
  • BroadcastHashJoin vs SortMergeJoin: Типы джойнов. BroadcastHashJoin — это когда маленькую табличку раскидывают по всем узлам. Обычно быстрее. SortMergeJoin — когда обе большие, их сортируют и мержат. Медленнее. Если видишь второй, а табличка одна — маленькая, задумайся о хинте broadcast.

3. И что со всем этим делать? Практика.

  1. Охотись за Exchange: Увидел — спроси себя: "А оно мне надо? Можно ли без этого?" Может, данные перепартиционировать заранее? Или broadcast join использовать? Каждый убранный shuffle — это тебе плюс в карму и минус к времени выполнения.
  2. Лезь в Scan и ищи PushedFilters: Если их нет, а фильтры в запросе есть — это повод задуматься. Может, с форматом данных что не так?
  3. Смотри на размеры: В плане иногда пишут примерное число строк. Если после Join строк стало в сто раз больше — это тревожный звоночек. Не дай бог, у тебя декартово произведение вылезло.
  4. Сравнивай, блядь! Это главный инструмент. Написал запрос одним способом — посмотрел план. Переставил filter и join местами — снова посмотрел план. Убрал ненужную колонку — и опять план. Spark иногда умный, а иногда — мудя. Только сравнивая, поймёшь, что для него лучше.

В общем, план выполнения — это не магия, а инструкция. Надо просто научиться её читать, и тогда половина проблем с оптимизацией решится сама собой. Удачи, не засоряй кластер!