Как Apache Spark читает и записывает данные?

«Как Apache Spark читает и записывает данные?» — вопрос из категории Apache Spark, который задают на 33% собеседований Data Инженер. Ниже — развёрнутый ответ с разбором ключевых моментов.

Ответ

В Apache Spark чтение и запись данных абстрагированы через API DataFrameReader и DataFrameWriter. Я использую их для работы с различными форматами и источниками.

Чтение данных: Основной метод — spark.read, который возвращает DataFrameReader. Можно явно указать формат и опции.

# Чтение Parquet-файлов
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

df_parquet = spark.read.parquet("/data/events/*.parquet")

# Чтение CSV с указанием схемы и опций
schema = "id INT, name STRING, value DOUBLE"
df_csv = spark.read 
    .format("csv") 
    .schema(schema) 
    .option("header", "true") 
    .option("delimiter", ";") 
    .load("/data/file.csv")

# Чтение из JDBC-источника
df_jdbc = spark.read 
    .format("jdbc") 
    .option("url", "jdbc:postgresql://localhost/db") 
    .option("dbtable", "sales") 
    .option("user", "user") 
    .option("password", "pass") 
    .load()

Запись данных: Запись выполняется через DataFrame.write, возвращающий DataFrameWriter. Критически важно выбрать правильный режим сохранения.

# Запись в Parquet с перезаписью существующих данных
df.write.mode("overwrite").parquet("/output/data.parquet")

# Запись в партиционированном виде
df.write 
    .mode("append") 
    .partitionBy("date", "country") 
    .parquet("/output/partitioned_data")

# Запись в несколько форматов с кастомными настройками
df.write 
    .format("json") 
    .mode("error") 
    .option("compression", "gzip") 
    .save("/output/data.json.gz")

Ключевые особенности, которые я учитываю:

  • Ленивые вычисления: Сами операции read и write не выполняются немедленно. Они являются частью графа вычислений, который запускается при вызове действия (action).
  • Схема: При чтении JSON или CSV схема может быть выведена автоматически (что дорого для больших файлов), поэтому я часто задаю ее вручную через .schema() для эффективности и контроля типов данных.
  • Партицирование: При записи в файловые системы партицирование по ключевым колонкам (например, по дате) кардинально ускоряет последующие чтения, которые фильтруют по этим колонкам.