Приходилось ли распараллеливать задачи в Apache Spark

Ответ

Да, постоянно работаю с распараллеливанием в Apache Spark. Вот ключевые аспекты из моего опыта:

1. Базовые механизмы параллелизма в Spark

RDD (Resilient Distributed Datasets):

from pyspark import SparkContext

sc = SparkContext("local[*]", "ParallelExample")

# Создание RDD и параллельная обработка
data = sc.parallelize(range(1, 1000001))

# Операции выполняются параллельно на всех ядрах
squared = data.map(lambda x: x * x)
filtered = squared.filter(lambda x: x % 2 == 0)
result = filtered.reduce(lambda a, b: a + b)

print(f"Result: {result}")

DataFrame API (более современный подход):

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder 
    .appName("ParallelProcessing") 
    .getOrCreate()

# Создание DataFrame
df = spark.range(1, 1000001)

# UDF выполняется параллельно на всех executor'ах
square_udf = udf(lambda x: x * x, IntegerType())
df_squared = df.withColumn("squared", square_udf(col("id")))

# Агрегация с параллельным выполнением
result = df_squared.agg({"squared": "sum"}).collect()[0][0]
print(f"Sum of squares: {result}")

2. Управление параллелизмом через репартиционирование

# Исходные данные с малым числом партиций
df = spark.read.csv("large_dataset.csv", header=True)
print(f"Initial partitions: {df.rdd.getNumPartitions()}")

# Репартиционирование для оптимального параллелизма
df_repartitioned = df.repartition(100)  # Явное указание числа партиций
# или
df_repartitioned = df.repartition("category_column")  # По ключу

# Coalesce для уменьшения числа партиций (без shuffle)
df_coalesced = df_repartitioned.coalesce(10)

3. Параллельная обработка с окнами и агрегациями

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

window_spec = Window.partitionBy("department").orderBy("salary")

df_with_rank = df.withColumn("rank", rank().over(window_spec)) 
                 .withColumn("dense_rank", dense_rank().over(window_spec))

# Каждая department обрабатывается параллельно в отдельной партиции

4. Оптимизация параллельного выполнения

Конфигурация для кластера:

spark = SparkSession.builder 
    .appName("OptimizedJob") 
    .config("spark.executor.instances", "8") 
    .config("spark.executor.cores", "4") 
    .config("spark.executor.memory", "8g") 
    .config("spark.sql.shuffle.partitions", "200") 
    .getOrCreate()

5. Работа с партиционированными данными

# Чтение партиционированных данных
partitioned_df = spark.read.parquet("/data/partitioned/") 
    .where("year = 2023 AND month = 12")

# Параллельная запись с партиционированием
df.write 
    .partitionBy("year", "month", "day") 
    .mode("overwrite") 
    .parquet("/output/partitioned_data/")

6. Распараллеливание сложных пайплайнов

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier

# Все стадии пайплайна выполняются параллельно
assembler = VectorAssembler(inputCols=["feature1", "feature2"], 
                           outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
rf = RandomForestClassifier(featuresCol="scaled_features", 
                           labelCol="label")

pipeline = Pipeline(stages=[assembler, scaler, rf])
model = pipeline.fit(training_data)  # Параллельное обучение на всех ядрах

Ключевые уроки из практики:

  • Оптимальное число партиций ≈ 2-4 × число ядер в кластере
  • Избегаю skew данных через salting или кастомное партиционирование
  • Использую broadcast для небольших таблиц в join'ах
  • Мониторю UI Spark для выявления bottlenecks в параллельном выполнении
  • Для I/O операций настраиваю параллельное чтение/запись через увеличение числа партиций

Ответ 18+ 🔞

А, распараллеливание в Spark? Да я с этой штукой, можно сказать, на короткой ноге, ебать мои старые костыли. Сидишь, бывало, смотришь на свой код, который ползает как черепаха, и думаешь — «ну ёпта, какого хуя?». А потом включаешь мозги и начинаешь раскидывать задачи по ядрам, и всё летает. Сейчас расскажу, как не облажаться.

1. Базовые штуки, с которых всё начинается

RDD (Эти ваши Resilient Distributed Datasets): Ну, классика жанра, старичок. Создаёшь кучу данных и говоришь Spark'у: «чувак, разбей это на куски и гони параллельно».

from pyspark import SparkContext

sc = SparkContext("local[*]", "ParallelExample")

# Создание RDD и параллельная обработка
data = sc.parallelize(range(1, 1000001))

# Операции выполняются параллельно на всех ядрах
squared = data.map(lambda x: x * x)
filtered = squared.filter(lambda x: x % 2 == 0)
result = filtered.reduce(lambda a, b: a + b)

print(f"Result: {result}")

Вот тут главное — local[*]. Эта звёздочка значит «возьми все ядра, какие есть, и давай, не стесняйся». Без неё будет один поток и овердохуища тоски.

DataFrame API (Это уже посерьёзнее): Тут уже ближе к SQL, удобнее, но суть та же — параллелим всё, что можно.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

spark = SparkSession.builder 
    .appName("ParallelProcessing") 
    .getOrCreate()

# Создание DataFrame
df = spark.range(1, 1000001)

# UDF выполняется параллельно на всех executor'ах
square_udf = udf(lambda x: x * x, IntegerType())
df_squared = df.withColumn("squared", square_udf(col("id")))

# Агрегация с параллельным выполнением
result = df_squared.agg({"squared": "sum"}).collect()[0][0]
print(f"Sum of squares: {result}")

Только с UDF осторожно — они хоть и параллелятся, но иногда тормозят как вротберунчик, потому что Питон не JVM. Но если без них никуда — то да, жми.

2. Репартиционирование, или «разложим всё по полочкам»

Бывает, данные приходят одним комком, в одной партиции. Это пиздец как неэффективно. Один поток пашет, остальные десять кофе пьют.

# Исходные данные с малым числом партиций
df = spark.read.csv("large_dataset.csv", header=True)
print(f"Initial partitions: {df.rdd.getNumPartitions()}")

# Репартиционирование для оптимального параллелизма
df_repartitioned = df.repartition(100)  # Явное указание числа партиций
# или
df_repartitioned = df.repartition("category_column")  # По ключу

# Coalesce для уменьшения числа партиций (без shuffle)
df_coalesced = df_repartitioned.coalesce(10)

repartition — это тяжёлая артиллерия, делает полный shuffle. coalesce — полегче, просто сливает соседние партиции, shuffle не делает. Выбирай по ситуации, а то можно так наоптимизировать, что кластер загнётся.

3. Оконные функции — чтобы всё летало группами

Тут каждая группа (department) обрабатывается в своей песочнице, параллельно другим. Красота.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank

window_spec = Window.partitionBy("department").orderBy("salary")

df_with_rank = df.withColumn("rank", rank().over(window_spec)) 
                 .withColumn("dense_rank", dense_rank().over(window_spec))

# Каждая department обрабатывается параллельно в отдельной партиции

Главное, чтобы данных по группам было примерно поровну, а не так, что в одной группе миллион записей, а в остальных — по три штуки. Это называется skew, и это — хитрая жопа, которая всё портит.

4. Настройка кластера — тут без магии, только знание

Ты же не хочешь, чтобы твои executor'ы тупили от нехватки памяти или дрались за одно ядро? Вот и настрой.

spark = SparkSession.builder 
    .appName("OptimizedJob") 
    .config("spark.executor.instances", "8") 
    .config("spark.executor.cores", "4") 
    .config("spark.executor.memory", "8g") 
    .config("spark.sql.shuffle.partitions", "200") 
    .getOrCreate()

spark.sql.shuffle.partitions — священная корова. По умолчанию их 200. Если данных овердохуища, увеличивай. Если мало — уменьшай, а то overhead на управление партициями сожрёт всё время.

5. Партиционированные данные — читай умно, пиши с умом

Если данные уже лежат, разбитые по папкам year=2023/month=12, то Spark это оценит и будет читать только нужное, параллельно.

# Чтение партиционированных данных
partitioned_df = spark.read.parquet("/data/partitioned/") 
    .where("year = 2023 AND month = 12")

# Параллельная запись с партиционированием
df.write 
    .partitionBy("year", "month", "day") 
    .mode("overwrite") 
    .parquet("/output/partitioned_data/")

Пиши тоже с партиционированием — потом и тебе, и другим будет легче. Это как разложить носки по ящикам, а не скинуть в одну кучу.

6. Сложные пайплайны — тоже параллелятся

ML пайплайн — не проблема. Каждая стадия может грести на всех доступных ядрах.

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier

# Все стадии пайплайна выполняются параллельно
assembler = VectorAssembler(inputCols=["feature1", "feature2"], 
                           outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
rf = RandomForestClassifier(featuresCol="scaled_features", 
                           labelCol="label")

pipeline = Pipeline(stages=[assembler, scaler, rf])
model = pipeline.fit(training_data)  # Параллельное обучение на всех ядрах

Рандомный лес, кстати, сам по себе отлично параллелится на уровне деревьев.

Итоговые советы, выстраданные на крови:

  • Сколько партиций? Правило большого пальца: в 2-4 раза больше, чем ядер в кластере. Меньше — будут простаивать. Больше — overhead замучает.
  • Skew данных — враг. Если один ключ в groupBy или join слишком жирный, всё встанет. Спасайся salting — подсыпай случайный префикс к ключу, чтобы размазать нагрузку.
  • Маленькую таблицу — в broadcast. Если джойнишь огромную таблицу с маленькой, отдай Spark'у команду broadcast. Он разошлёт её копию на все ноды, и join станет в разы быстрее. Доверия к его автоопределению — ебать ноль, лучше указать вручную.
  • Смотри в UI Spark. Там, в Stages, видно, где твой код уперся в стену. Висит одна задача два часа, а остальные давно кончили? Вот он, skew, мать его.
  • Для чтения/записи тоже увеличивай партиции, особенно если источник — медленный. Больше потоков — быстрее управимся.

Вот так, вкратце. Главное — понимать, что ты делаешь, а не тыкать настройки наугад. А то будет тебе не оптимизация, а ёперный театр.