Ответ
Перекос данных (data skew) в Apache Spark — это ситуация, когда данные распределены между партициями RDD или DataFrame крайне неравномерно. В результате задачи (tasks), обрабатывающие перекошенные партиции с огромным объемом данных, выполняются намного дольше, чем остальные. Это создает "длинный хвост" выполнения, простаивание ресурсов и резко снижает общую производительность job, особенно на стадиях shuffle (например, при join или groupBy).
Типичный пример и проблема:
// Предположим, 99% событий имеют user_id = 0 (незалогиненный пользователь)
val eventLogs: DataFrame = ... // DataFrame со столбцом [user_id, event]
// При join или groupBy по user_id все записи с user_id=0 попадут в одну партицию
val skewedAgg = eventLogs.groupBy("user_id").count()
// Задача, обрабатывающая партицию для user_id=0, будет монструозной и медленной.
Основные стратегии борьбы с перекосом:
-
Соление ключей (Salting / Key Augmentation): Добавляем случайный префикс или суффикс к ключам, чтобы разбить "тяжелый" ключ на множество мелких.
import org.apache.spark.sql.functions._ val saltNum = 100 // Количество "солей" // Соление для агрегации val saltedDF = eventLogs .withColumn("salted_key", concat(col("user_id"), lit("_"), (rand() * saltNum).cast("int"))) .groupBy("salted_key") .agg(sum("amount").as("partial_sum")) .withColumn("original_key", split(col("salted_key"), "_").getItem(0)) .groupBy("original_key").agg(sum("partial_sum").as("total_sum")) -
Использование Broadcast Join: Если одна из таблиц для join достаточно мала, чтобы поместиться в память всех executor'ов, Spark может разослать ее копию, избежав дорогостоящего shuffle. Это решает проблему перекоса для этого конкретного join.
// smallDF будет broadcasted, shuffle не произойдет val result = largeDF.join(broadcast(smallDF), "key") -
Оптимизация разделения (Repartitioning): Явное увеличение числа партиций для стадии shuffle может помочь распределить нагрузку лучше, хотя при сильном перекосе одного ключа это не всегда эффективно.
val repartitionedDF = df.repartition(200, col("key")) -
Использование Skew Join Hint (Spark SQL 3.0+): Позволяет явно указать ядру Spark на перекошенные ключи и их значения для специальной оптимизации.
-- Указываем, что в таблице 'orders' для ключа 'customer_id' значения (1, 7, 9) являются перекошенными SELECT /*+ SKEW('orders', 'customer_id', (1, 7, 9)) */ * FROM orders JOIN customers ON orders.customer_id = customers.id;
Выбор стратегии зависит от конкретных данных, размера кластера и типа операции.
Ответ 18+ 🔞
А, слушай, вот тебе история про одну из главных причин, почему твой Spark-джоб может ебаться как последний олень, пока все остальные уже давно по пивку. Это называется перекос данных, или data skew, если по-умному.
Представь себе, что у тебя есть мешок с картошкой, и ты её на десять кучек делишь. Вроде бы всё честно. А потом смотришь — в девяти кучках по одной мелкой картошке, а в одной, блядь, целый центнер, и там ещё булыжник на дне. Вот задача, которая эту кучку обрабатывает, будет пиздеться до второго пришествия, пока остальные девять уже давно забили болт и ждут. Это и есть перекос. Данные по партициям размазаны как говно по тарелке — где пусто, а где густо, овердохуища.
Классический пиздец, который у всех случается:
// Допустим, 99% событий в логах — это анонимы с user_id = 0. Ну, классика.
val eventLogs: DataFrame = ... // [user_id, event]
// Пытаешься сгруппировать по юзеру — и всё, пизда рулю.
val skewedAgg = eventLogs.groupBy("user_id").count()
// Вся эта орава анонимов с нулевым айдишником влетает в одну партицию. Таска, которая её обрабатывает, становится монстром, который жрёт ресурсы и времени у неё — хуй с горы. Остальные уже сто лет как посчитали свои 10 записей и тупят.
Ну и что делать, ёпта? Способы есть, не кипишуй.
-
Соление ключей (Salting). Хитрая, блядь, жопа. Берёшь этот жирный ключ (тот самый
user_id=0) и начинаешь его дробить. Добавляешь к нему случайную соль — префикс или суффикс.import org.apache.spark.sql.functions._ val saltNum = 100 // Сколько кусочков от этого пирога отломить // Солим перед агрегацией val saltedDF = eventLogs .withColumn("salted_key", concat(col("user_id"), lit("_"), (rand() * saltNum).cast("int"))) // Теперь user_id=0 размажется на 100 ключей: 0_1, 0_2, ... 0_99 .groupBy("salted_key") // Группируем по посоленному ключу — нагрузка распределится .agg(sum("amount").as("partial_sum")) .withColumn("original_key", split(col("salted_key"), "_").getItem(0)) // Возвращаем оригинальный ключ .groupBy("original_key").agg(sum("partial_sum").as("total_sum")) // И финально агрегируем уже по нормальному ключуСуть в том, чтобы тяжёлый ключ размазать на много мелких, обработать их параллельно, а потом слепить обратно. Удивление пиздец, но работает.
-
Broadcast Join. Это когда одна таблица такая мелкая, что её по карманам раздать можно. Если ты джойнишь большую таблицу с маленькой, то можно маленькую просто скопировать на каждый экзекьютор, и тогда никакого шаффла — а значит, и перекоса — не будет вообще.
// smallDF — такая мартышлюшка, помещается в память. Броадкастим её. val result = largeDF.join(broadcast(smallDF), "key")Главное — не обосраться с оценкой размера. Если таблица на самом деле не маленькая, а ты её броадкастнешь, будет тебе хиросима, чувак.
-
Репартиционирование. Иногда помогает просто увеличить число партиций, чтобы данные лучше перемешались. Но это как пытаться равномерно размешать ложкой сахар в стакане, где на дне целая гора. Если перекос дикий, одной ложкой не поможешь.
val repartitionedDF = df.repartition(200, col("key")) // Дробим на 200 кучек вместо дефолтных 200Может помочь, а может и нет. Часто это просто полумера.
-
Skew Join Hint (в Spark 3.0 и выше). Это когда ты можешь прямо намекнуть Спарку: «Слушай, дружище, у меня тут в колонке
customer_idзначения1, 7, 9— это пиздопроебибна, жирные такие, с ними аккуратнее». И Спарк постарается их особым образом обработать.SELECT /*+ SKEW('orders', 'customer_id', (1, 7, 9)) */ * FROM orders JOIN customers ON orders.customer_id = customers.id;Удобно, но нужно заранее знать, какие ключи — говно.
В общем, выбор способа — это как лечение: смотришь по симптомам. Иногда достаточно броадкаста, иногда надо солить как селёдку. Главное — не игнорировать проблему, а то твой джоб будет висеть вечностью, и доверия к тебе как к инженеру будет, блядь, ноль.