Как вы оптимизировали запросы в Apache Spark?

«Как вы оптимизировали запросы в Apache Spark?» — вопрос из категории Apache Spark, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

В моих проектах оптимизация Spark строилась на анализе UI и понимании физического плана выполнения. Вот ключевые методы, которые я применял:

  1. Оптимизация партиционирования:

    • Избегание skew данных: Если при groupBy или join одна партиция становится слишком большой, использую salting или увеличиваю уровень параллелизма.
      // Salting для борьбы со skew при JOIN
      val saltedDF = df.withColumn("salt", (rand() * 100).cast("int"))
    • Сохранение данных с оптимальным числом партиций:
      df.repartition(200, "join_key").write.parquet("path") # Для последующих JOIN
      df.coalesce(10).write.parquet("path") # Для финального вывода малого объема
  2. Использование Adaptive Query Execution (AQE): Включаю всегда. AQE автоматически объединяет мелкие партиции, оптимизирует стратегию JOIN на лету и корректирует число партиций после shuffle.

    spark.conf.set("spark.sql.adaptive.enabled", true)
    spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", true)
  3. Выбор правильного типа JOIN:

    • Broadcast Hash Join: Для маленьких таблиц (меньше spark.sql.autoBroadcastJoinThreshold, обычно 10 МБ).
    • Sort Merge Join: По умолчанию для больших таблиц. Ключево — партиционирование и сортировка по ключу JOIN.
  4. Кэширование с умом: Кэширую (df.persist(StorageLevel.MEMORY_AND_DISK_SER)) только датафреймы, которые используются многократно в iterative алгоритмах (например, ML) или в нескольких действиях. Всегда оцениваю стоимость через Spark UI.

  5. Работа с форматами данных: Использую колоночные форматы (Parquet, ORC) с предикатным фильтром pushdown. Важно правильно настроить размер блока и страницы.

  6. Настройка ресурсов: Убеждаюсь, что количество ядер и памяти на экзекьюторе соответствует задаче, чтобы избежать излишнего garbage collection. Мониторю вкладки "Executors" и "GC Time" в Spark UI.