Что такое перекос данных (data skew) в Apache Spark?

Ответ

Перекос данных (data skew) в Apache Spark — это ситуация, когда данные распределены между партициями RDD или DataFrame крайне неравномерно. В результате задачи (tasks), обрабатывающие перекошенные партиции с огромным объемом данных, выполняются намного дольше, чем остальные. Это создает "длинный хвост" выполнения, простаивание ресурсов и резко снижает общую производительность job, особенно на стадиях shuffle (например, при join или groupBy).

Типичный пример и проблема:

// Предположим, 99% событий имеют user_id = 0 (незалогиненный пользователь)
val eventLogs: DataFrame = ... // DataFrame со столбцом [user_id, event]

// При join или groupBy по user_id все записи с user_id=0 попадут в одну партицию
val skewedAgg = eventLogs.groupBy("user_id").count()
// Задача, обрабатывающая партицию для user_id=0, будет монструозной и медленной.

Основные стратегии борьбы с перекосом:

  1. Соление ключей (Salting / Key Augmentation): Добавляем случайный префикс или суффикс к ключам, чтобы разбить "тяжелый" ключ на множество мелких.

    import org.apache.spark.sql.functions._
    val saltNum = 100 // Количество "солей"
    
    // Соление для агрегации
    val saltedDF = eventLogs
      .withColumn("salted_key", concat(col("user_id"), lit("_"), (rand() * saltNum).cast("int")))
      .groupBy("salted_key")
      .agg(sum("amount").as("partial_sum"))
      .withColumn("original_key", split(col("salted_key"), "_").getItem(0))
      .groupBy("original_key").agg(sum("partial_sum").as("total_sum"))
  2. Использование Broadcast Join: Если одна из таблиц для join достаточно мала, чтобы поместиться в память всех executor'ов, Spark может разослать ее копию, избежав дорогостоящего shuffle. Это решает проблему перекоса для этого конкретного join.

    // smallDF будет broadcasted, shuffle не произойдет
    val result = largeDF.join(broadcast(smallDF), "key")
  3. Оптимизация разделения (Repartitioning): Явное увеличение числа партиций для стадии shuffle может помочь распределить нагрузку лучше, хотя при сильном перекосе одного ключа это не всегда эффективно.

    val repartitionedDF = df.repartition(200, col("key"))
  4. Использование Skew Join Hint (Spark SQL 3.0+): Позволяет явно указать ядру Spark на перекошенные ключи и их значения для специальной оптимизации.

    -- Указываем, что в таблице 'orders' для ключа 'customer_id' значения (1, 7, 9) являются перекошенными
    SELECT /*+ SKEW('orders', 'customer_id', (1, 7, 9)) */ *
    FROM orders JOIN customers ON orders.customer_id = customers.id;

Выбор стратегии зависит от конкретных данных, размера кластера и типа операции.

Ответ 18+ 🔞

А, слушай, вот тебе история про одну из главных причин, почему твой Spark-джоб может ебаться как последний олень, пока все остальные уже давно по пивку. Это называется перекос данных, или data skew, если по-умному.

Представь себе, что у тебя есть мешок с картошкой, и ты её на десять кучек делишь. Вроде бы всё честно. А потом смотришь — в девяти кучках по одной мелкой картошке, а в одной, блядь, целый центнер, и там ещё булыжник на дне. Вот задача, которая эту кучку обрабатывает, будет пиздеться до второго пришествия, пока остальные девять уже давно забили болт и ждут. Это и есть перекос. Данные по партициям размазаны как говно по тарелке — где пусто, а где густо, овердохуища.

Классический пиздец, который у всех случается:

// Допустим, 99% событий в логах — это анонимы с user_id = 0. Ну, классика.
val eventLogs: DataFrame = ... // [user_id, event]

// Пытаешься сгруппировать по юзеру — и всё, пизда рулю.
val skewedAgg = eventLogs.groupBy("user_id").count()
// Вся эта орава анонимов с нулевым айдишником влетает в одну партицию. Таска, которая её обрабатывает, становится монстром, который жрёт ресурсы и времени у неё — хуй с горы. Остальные уже сто лет как посчитали свои 10 записей и тупят.

Ну и что делать, ёпта? Способы есть, не кипишуй.

  1. Соление ключей (Salting). Хитрая, блядь, жопа. Берёшь этот жирный ключ (тот самый user_id=0) и начинаешь его дробить. Добавляешь к нему случайную соль — префикс или суффикс.

    import org.apache.spark.sql.functions._
    val saltNum = 100 // Сколько кусочков от этого пирога отломить
    
    // Солим перед агрегацией
    val saltedDF = eventLogs
      .withColumn("salted_key", concat(col("user_id"), lit("_"), (rand() * saltNum).cast("int"))) // Теперь user_id=0 размажется на 100 ключей: 0_1, 0_2, ... 0_99
      .groupBy("salted_key") // Группируем по посоленному ключу — нагрузка распределится
      .agg(sum("amount").as("partial_sum"))
      .withColumn("original_key", split(col("salted_key"), "_").getItem(0)) // Возвращаем оригинальный ключ
      .groupBy("original_key").agg(sum("partial_sum").as("total_sum")) // И финально агрегируем уже по нормальному ключу

    Суть в том, чтобы тяжёлый ключ размазать на много мелких, обработать их параллельно, а потом слепить обратно. Удивление пиздец, но работает.

  2. Broadcast Join. Это когда одна таблица такая мелкая, что её по карманам раздать можно. Если ты джойнишь большую таблицу с маленькой, то можно маленькую просто скопировать на каждый экзекьютор, и тогда никакого шаффла — а значит, и перекоса — не будет вообще.

    // smallDF — такая мартышлюшка, помещается в память. Броадкастим её.
    val result = largeDF.join(broadcast(smallDF), "key")

    Главное — не обосраться с оценкой размера. Если таблица на самом деле не маленькая, а ты её броадкастнешь, будет тебе хиросима, чувак.

  3. Репартиционирование. Иногда помогает просто увеличить число партиций, чтобы данные лучше перемешались. Но это как пытаться равномерно размешать ложкой сахар в стакане, где на дне целая гора. Если перекос дикий, одной ложкой не поможешь.

    val repartitionedDF = df.repartition(200, col("key")) // Дробим на 200 кучек вместо дефолтных 200

    Может помочь, а может и нет. Часто это просто полумера.

  4. Skew Join Hint (в Spark 3.0 и выше). Это когда ты можешь прямо намекнуть Спарку: «Слушай, дружище, у меня тут в колонке customer_id значения 1, 7, 9 — это пиздопроебибна, жирные такие, с ними аккуратнее». И Спарк постарается их особым образом обработать.

    SELECT /*+ SKEW('orders', 'customer_id', (1, 7, 9)) */ *
    FROM orders JOIN customers ON orders.customer_id = customers.id;

    Удобно, но нужно заранее знать, какие ключи — говно.

В общем, выбор способа — это как лечение: смотришь по симптомам. Иногда достаточно броадкаста, иногда надо солить как селёдку. Главное — не игнорировать проблему, а то твой джоб будет висеть вечностью, и доверия к тебе как к инженеру будет, блядь, ноль.