Ответ
Проблемы с памятью (OOM) в Spark — частое явление при работе с большими или несбалансированными данными. Вот конкретные методы, которые я применял:
1. Оптимизация партиционирования Неравномерное распределение данных (skew) — главная причина OOM.
- Увеличение числа партиций:
df.repartition(200)или настройкаspark.sql.shuffle.partitions(по умолчанию 200). Это уменьшает размер каждой партии для операцийshuffle. - Солянка (salting) для skew join: При джойне с "тяжелыми" ключами добавляю случайный префикс к ключам в большом датафрейме, чтобы равномерно распределить нагрузку.
2. Правильное использование операций
- Broadcast Join для маленьких таблиц: Если одна из таблиц помещается в память исполнителя.
from pyspark.sql.functions import broadcast large_df.join(broadcast(small_df), "key") - Избегание
collect(): Эта операция собирает все данные на драйвер. Вместо этого используюtake(),show()или записываю результаты прямо в распределенное хранилище (HDFS, S3).
3. Настройка параметров памяти
Ключевые параметры в spark-defaults.conf или через SparkSession:
spark.conf.set("spark.executor.memory", "8g")
spark.conf.set("spark.memory.fraction", "0.8") # Доля памяти для исполнения и хранения
spark.conf.set("spark.memory.storageFraction", "0.3") # Доля зарезервированная для хранения
4. Стратегии кэширования
Использую MEMORY_AND_DISK_SER вместо MEMORY_ONLY. Сериализованные данные занимают меньше места, а Spark сбросит на диск то, что не поместилось в память, вместо падения с OOM.
df.persist(StorageLevel.MEMORY_AND_DISK_SER)
5. Мониторинг и анализ Использую Spark UI для отслеживания стадий (Stages), времени выполнения задач (Tasks) и обнаружения skew в данных (размеры партиций в Shuffle Read).