Ответ
При работе с таблицами через Spark SQL взаимодействие происходит через два основных компонента: Hive Metastore для метаданных и Hadoop Distributed File System (HDFS) или совместимые объектные хранилища (S3, ADLS) для самих данных.
Процесс чтения:
- Spark SQL запрашивает метаданные таблицы (схему, расположение данных, информацию о партициях, формат файлов) из Hive Metastore.
- Используя эти метаданные, Spark напрямую читает файлы (Parquet, ORC, Avro, текст) из HDFS/S3 через Hadoop FileSystem API, минуя движок Hive.
Процесс записи:
- Spark записывает данные в файлы на HDFS/S3 в указанном формате.
- Затем он обновляет метаданные в Hive Metastore, добавляя информацию о новых партициях или файлах.
Пример на Scala/Spark:
// Включение поддержки Hive (если требуется внешний metastore)
val spark = SparkSession.builder()
.appName("Hive Integration")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate()
// Чтение таблицы 'sales' из Hive Metastore
val df = spark.sql("SELECT * FROM my_db.sales WHERE year = 2024")
// Spark прочитает только партиции year=2024 благодаря partition pruning
// Запись в новую партиционированную Hive-таблицу
df.write
.partitionBy("region", "month")
.mode("overwrite")
.saveAsTable("my_db.sales_aggregated") // Сохраняет данные и регистрирует таблицу в metastore
Ключевые оптимизации:
- Partition Pruning: При фильтрации по партиционированным колонкам (например,
year) Spark читает только соответствующие директории. - Predicate Pushdown: Для колоночных форматов (Parquet, ORC) фильтры применяются на этапе чтения, что сокращает объем загружаемых данных.
- Spark может работать со встроенным Derby metastore (для тестов) или внешним (MySQL, PostgreSQL) для продакшена.