Как понять, на какую ноду попадёт строка при shuffle в Spark?

«Как понять, на какую ноду попадёт строка при shuffle в Spark?» — вопрос из категории Apache Spark, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

Распределение строк по партициям (и, следовательно, нодам) во время shuffle определяется партиционером (Partitioner) и значением ключа партиционирования.

Основной механизм (по умолчанию): Spark использует HashPartitioner. Номер партиции для строки вычисляется как: partitionId = hash(partitioningKey) % numPartitions

Пример в PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id

spark = SparkSession.builder.appName("Partitioning").getOrCreate()
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C"), (1, "A2")], ["user_id", "data"])

# Выполняем репартиционирование по user_id
repartitioned_df = df.repartition(4, "user_id")

# Смотрим, в какую партицию попала каждая строка
repartitioned_df.withColumn("partition_id", spark_partition_id()).show()
# +-------+----+------------+
# |user_id|data|partition_id|
# +-------+----+------------+
# |      1|   A|           1|  # Все строки с user_id=1 попадут в одну партицию
# |      1|  A2|           1|  # (например, partition_id=1)
# |      3|   C|           3|
# |      2|   B|           0|
# +-------+----+------------+

Практические выводы:

  1. Одинаковый ключ = одна партиция. Все строки с одинаковым user_id будут обрабатываться на одной ноде. Это важно для операций groupByKey, reduceByKey.
  2. Качество распределения (избегание skew). Если один ключ (например, user_id=null или default) встречается слишком часто, возникает перекос (data skew) — одна партиция будет огромной, а остальные пустыми. Это убивает производительность.
  3. Выбор ключа. Ключ должен иметь высокую кардинальность (много уникальных значений) для равномерного распределения. Иногда я добавляю случайный префикс (salt) к ключу, чтобы разбить "тяжелые" ключи.

Как проверить? Используйте df.rdd.getNumPartitions() и df.rdd.glom().map(len).collect() чтобы увидеть размер каждой партиции.