Ответ
Распределение строк по партициям (и, следовательно, нодам) во время shuffle определяется партиционером (Partitioner) и значением ключа партиционирования.
Основной механизм (по умолчанию):
Spark использует HashPartitioner. Номер партиции для строки вычисляется как:
partitionId = hash(partitioningKey) % numPartitions
Пример в PySpark:
from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id
spark = SparkSession.builder.appName("Partitioning").getOrCreate()
df = spark.createDataFrame([(1, "A"), (2, "B"), (3, "C"), (1, "A2")], ["user_id", "data"])
# Выполняем репартиционирование по user_id
repartitioned_df = df.repartition(4, "user_id")
# Смотрим, в какую партицию попала каждая строка
repartitioned_df.withColumn("partition_id", spark_partition_id()).show()
# +-------+----+------------+
# |user_id|data|partition_id|
# +-------+----+------------+
# | 1| A| 1| # Все строки с user_id=1 попадут в одну партицию
# | 1| A2| 1| # (например, partition_id=1)
# | 3| C| 3|
# | 2| B| 0|
# +-------+----+------------+
Практические выводы:
- Одинаковый ключ = одна партиция. Все строки с одинаковым
user_idбудут обрабатываться на одной ноде. Это важно для операцийgroupByKey,reduceByKey. - Качество распределения (избегание skew). Если один ключ (например,
user_id=nullилиdefault) встречается слишком часто, возникает перекос (data skew) — одна партиция будет огромной, а остальные пустыми. Это убивает производительность. - Выбор ключа. Ключ должен иметь высокую кардинальность (много уникальных значений) для равномерного распределения. Иногда я добавляю случайный префикс (
salt) к ключу, чтобы разбить "тяжелые" ключи.
Как проверить? Используйте df.rdd.getNumPartitions() и df.rdd.glom().map(len).collect() чтобы увидеть размер каждой партиции.