Ответ
В Apache Spark чтение и запись данных абстрагированы через API DataFrameReader и DataFrameWriter. Я использую их для работы с различными форматами и источниками.
Чтение данных:
Основной метод — spark.read, который возвращает DataFrameReader. Можно явно указать формат и опции.
# Чтение Parquet-файлов
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
df_parquet = spark.read.parquet("/data/events/*.parquet")
# Чтение CSV с указанием схемы и опций
schema = "id INT, name STRING, value DOUBLE"
df_csv = spark.read
.format("csv")
.schema(schema)
.option("header", "true")
.option("delimiter", ";")
.load("/data/file.csv")
# Чтение из JDBC-источника
df_jdbc = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost/db")
.option("dbtable", "sales")
.option("user", "user")
.option("password", "pass")
.load()
Запись данных:
Запись выполняется через DataFrame.write, возвращающий DataFrameWriter. Критически важно выбрать правильный режим сохранения.
# Запись в Parquet с перезаписью существующих данных
df.write.mode("overwrite").parquet("/output/data.parquet")
# Запись в партиционированном виде
df.write
.mode("append")
.partitionBy("date", "country")
.parquet("/output/partitioned_data")
# Запись в несколько форматов с кастомными настройками
df.write
.format("json")
.mode("error")
.option("compression", "gzip")
.save("/output/data.json.gz")
Ключевые особенности, которые я учитываю:
- Ленивые вычисления: Сами операции
readиwriteне выполняются немедленно. Они являются частью графа вычислений, который запускается при вызове действия (action). - Схема: При чтении JSON или CSV схема может быть выведена автоматически (что дорого для больших файлов), поэтому я часто задаю ее вручную через
.schema()для эффективности и контроля типов данных. - Партицирование: При записи в файловые системы партицирование по ключевым колонкам (например, по дате) кардинально ускоряет последующие чтения, которые фильтруют по этим колонкам.