Когда Spark работает медленно (спиллы, шаффлы, перекосы данных)?

Ответ

Производительность Spark падает из-за нескольких типовых проблем:

  1. Перекос данных (Data Skew): Неравномерное распределение данных по партициям приводит к тому, что одни задачи выполняются долго, а другие простаивают.

    • Решение: Использовать соление ключей (salting) или увеличить количество партиций.
      // Добавление случайного префикса к ключу для борьбы с перекосом
      val saltedDf = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 100).cast("int")))
  2. Чрезмерные шаффлы: Операции join, groupBy и orderBy вызывают перемешивание данных между узлами — это дорогая операция ввода-вывода.

    • Решение: Использовать broadcast join для маленьких датафреймов, увеличить spark.sql.shuffle.partitions и применять partitionBy при записи.
  3. Спиллы на диск: Когда данные не помещаются в оперативную память исполнителя (executor), Spark записывает их на диск, что резко замедляет работу.

    • Решение: Увеличить spark.executor.memory, настроить spark.memory.fraction и использовать persist(StorageLevel.MEMORY_AND_DISK) для промежуточных датафреймов.
  4. Неоптимальные трансформации: Использование collect() на больших датасетах или пользовательских функций (UDF) там, где можно обойтись встроенными.

    • Решение: Всегда фильтровать данные как можно раньше и минимизировать передачу данных между драйвером и исполнителями.

Для диагностики я в первую очередь анализирую Spark UI, смотрю на время выполнения стадий (Stages), объем данных при шаффле (Shuffle Read/Write) и наличие спиллов (Spill).

Ответ 18+ 🔞

Ага, слушай, вот реально классика жанра, с которой каждый, кто работал со Spark, сталкивался. Прямо как будто вчера на проде такое тушил. Сидишь, смотришь на интерфейс, а там один таск ебёт как проклятый, а остальные уже давно откинулись и курят в сторонке. Удивление пиздец, честно.

Ну, первая и самая жирная беда — это перекос данных, она же Data Skew. Представь: у тебя миллиард записей, и 90% из них имеют ключ "Москва", а остальные 10% — все остальные города. Когда ты делаешь groupBy или join, партиция с Москвой становится такой жирной, что один бедный исполнитель (executor) её тащит, как Сизиф свой камень, а остальные стоят, бздят и ждут. Это пиздец как неэффективно. Что делать? Придётся хитрить. Можно, например, добавить к ключу случайную соль — тот самый salting. Берёшь и перемешиваешь эти жирные данные по разным вёдрам.

// Берём ключ и прикручиваем к нему случайную цифру, чтобы размазать по партициям
val saltedDf = df.withColumn("salted_key", concat(col("key"), lit("_"), (rand() * 100).cast("int")))

Потом группируешь уже по этому новому ключу, а в конце — агрегируешь обратно. Геморрой, но работает. Или просто овердохуища партиций накрутить, чтобы жирную кучу разбить на мелкие комочки.

Вторая головная боль — шаффлы. Операции вроде join или orderBy — это всегда сигнал, что сейчас начнётся великое перемешивание данных по сети. Все ноды начинают орать друг на друга "дай два байта!", и трафик зашкаливает. Если соединяешь большой датафрейм с маленьким — твой лучший друг broadcast join. Кинул маленькую табличку в память каждому исполнителю, и всё, шаффла нет, магия. А ещё посмотри на настройку spark.sql.shuffle.partitions. Если её оставить по умолчанию, а данных овердохуища, то каждая партиция будет размером с хороший такой булыжник. Увеличь это число, сделай партиции помельче — распределение будет равномернее.

Третий кошмар — спиллы на диск. Это когда твоим данным тесно в оперативке, и Spark, как последний бомж, начинает скидывать их на жёсткий диск. Скорость падает ниже плинтуса, всё начинает тормозить так, что хоть волком вой. Лечится увеличением памяти исполнителям (spark.executor.memory) и грамотным кэшированием. Если промежуточный результат используется несколько раз, прибей его к памяти вызовом persist(StorageLevel.MEMORY_AND_DISK). Пусть лучше часть в памяти, часть на диске, чем каждый раз пересчитывать с нуля.

И, наконец, четвёртый пункт — кривые руки разработчика. Самый частый грех — это вызов collect() на огромном датасете. Ты этим действием выгребаешь всё с исполнителей на драйвер, который обычно скромных размеров, и он просто накрывается медным тазом. Доверия ебать ноль к такому коду. Или вот эти самописные UDF, которые можно было заменить на встроенную функцию Spark. Spark их оптимизировать не умеет, и они тормозят как черепахи. Правило простое: фильтруй и режь данные как можно раньше и используй нативные трансформации.

Как это всё искать? Твой главный инструмент — Spark UI. Открываешь его и сразу лезешь смотреть на стадии (Stages). Видишь одну задачу, которая выполняется в десять раз дольше всех? Вот он, перекос. Смотришь на метрики Shuffle Read/Write — если цифры астрономические, значит, шаффл съедает всё. И ищешь волшебное слово "Spill" — если оно есть, значит, памяти не хватило и пошла запись на диск. Вот по этой карте и ищи, где у тебя в коде засел этот пидарас шерстяной, который всё тормозит.