Apache Spark™
Apache Spark™ — распределённая система обработки данных с открытым исходным кодом, предназначенная для обработки больших объёмов данных. Она использует кеширование в оперативной памяти и обеспечивает оптимальное выполнение запросов для быстрого анализа данных любого размера. Предоставляется API для разработки на Java, Scala, Python и R, а также поддерживается повторное использование кода в различных сценариях: пакетная обработка, интерактивные запросы, аналитика в реальном времени, машинное обучение и обработка графов. Apache Spark™ может работать с YDB с помощью YDB Spark Connector — специального модуля, предоставляющего реализацию основных примитивов Apache Spark™. Поддерживаются:
- Распределение операций по партициям таблиц YDB;
- Параллельная запись и чтение таблиц YDB;
- Автоматическое создание таблиц при их отсутствии.
Примечание
Для более быстрой работы YDB Spark Connector может потребоваться увеличить объем памяти, доступной для каждого исполнителя Apache Spark™. Рекомендуется указывать 4 ГБ или больше на один executor.
Где взять и как использовать
Для использования YDB в Apache Spark™ необходимо добавить YDB Spark Connector в драйвер Apache Spark™. Это можно сделать несколькими способами:
-
Загрузить коннектор напрямую из Maven Central с помощью опции
--packages
. Рекомендуется использовать последнюю опубликованную версию:Spark ShellPySparkSpark SQL~ $ spark-shell --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
~ $ pyspark --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
~ $ spark-sql --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
-
Скачать последнюю версию shaded-сборки (вариант коннектора, включающий все зависимости) из GitHub или Maven Central и указать скачанный артефакт в опции
--jars
:Spark ShellPySparkSpark SQL~ $ spark-shell --master <master-url> --jars ~/Download/ydb-spark-connector-shaded-2.0.1.jar --conf spark.executor.memory=4g
~ $ pyspark --master <master-url> --jars ~/Download/ydb-spark-connector-shaded-2.0.1.jar --conf spark.executor.memory=4g
~ $ spark-sql --master <master-url> --jars ~/Download/ydb-spark-connector-shaded-2.0.1.jar --conf spark.executor.memory=4g
-
Также скачанную shaded-сборку можно скопировать в папку
jars
дистрибутива Apache Spark™. В таком случае никаких опций указывать не требуется.
Работа через DataFrame API
DataFrame API позволяет работать с YDB в интерактивных spark-shell
или pyspark
, а также при написании кода на Java
, Scala
или Python
для spark-submit
.
Для создания DataFrame
нужно указать формат ydb
, передать набор опций подключения и путь к таблице YDB:
val ydb_df = spark.read.format("ydb").options(<options>).load("<table-path>")
ydb_df = spark.read.format("ydb").options(<options>).load("<table-path>")
Для сохранения любого DataFrame
в таблицу YDB аналогично нужно указать формат ydb
, опции подключения и путь к таблице:
any_dataframe.write.format("ydb").options(<options>).mode("append").save("<table-path>")
any_dataframe.write.format("ydb").options(<options>).mode("append").save("<table-path>")
Примечание
При записи данных в YDB рекомендуется использовать режим append
, который использует пакетную загрузку данных. Если указанная в методе save()
таблица не существует, она будет создана в соответствии с опциями автосоздания таблиц.
Более подробный пример приведён в разделе Пример работы с YDB в spark-shell.
Работа через Catalog API
С помощью каталогов возможна работа с YDB в интерактивном режиме spark-sql
или при выполнении SQL-запросов через метод spark.sql
.
В этом случае для доступа к YDB нужно добавить каталог, указав следующие опции Apache Spark™ (можно определить несколько каталогов с разными именами для разных баз данных YDB):
# Обязательный параметр типа каталога
spark.sql.catalog.<catalog_name>=tech.ydb.spark.connector.YdbCatalog
# Обязательный параметр url
spark.sql.catalog.<catalog_name>.url=<ydb-connection-url>
# Все остальные параметры не обязательны и указываются только при необходимости
spark.sql.catalog.<catalog_name>.<param-name>=<param-value>
После задания каталогов можно работать с таблицами YDB через стандартные SQL-запросы Apache Spark™. Обратите внимание, что в качестве разделителя в пути к таблице нужно использовать .
:
SELECT * FROM <catalog_name>.<table-path> LIMIT 10;
Более подробный пример приведён в разделе Пример работы с YDB в spark-sql.
Список параметров YDB Spark Connector
Поведение YDB Spark Connector настраивается с помощью опций, которые могут передаваться в виде одного набора с помощью метода options
или указываться по одной с помощью метода option
. При этом каждый DataFrame
и даже каждая отдельная операция над DataFrame
может иметь свой набор опций.
Опции подключения
-
url
— обязательный параметр с адресом подключения к YDB. Имеет видgrpc[s]://<endpoint>:<port>/<database>[?<options>]
.
Примеры использования:- Локальный Docker-контейнер с анонимной аутентификацией и без TLS:
grpc://localhost:2136/local
- Удалённый кластер, размещённый на собственном сервере:
grpcs://my-private-cluster:2135/Root/my-database?secureConnectionCertificate=~/myCertificate.cer
- Экземпляр облачной базы данных с токеном:
grpcs://ydb.my-cloud.com:2135/my_folder/test_database?tokenFile=~/my_token
- Экземпляр облачной базы данных с файлом сервисного аккаунта:
grpcs://ydb.my-cloud.com:2135/my_folder/test_database?saKeyFile=~/sa_key.json
- Локальный Docker-контейнер с анонимной аутентификацией и без TLS:
-
auth.use_env
— если указаноtrue
, используется режим аутентификации на основе переменных среды окружения. -
auth.use_metadata
— если указаноtrue
, используется режим аутентификации Metadata. Может быть указан прямо вurl
в виде опцииuseMetadata
. -
auth.login
иauth.password
— логин и пароль для статической аутентификации. -
auth.token
— аутентификация с использованием указанного Access Token. -
auth.token.file
— аутентификация с использованием Access Token из указанного файла. Может быть указан прямо вurl
в виде опцииtokenFile
. -
auth.ca.text
— указывает значение сертификата для установки TLS-соединения. -
auth.ca.file
— указывает путь к сертификату для установки TLS-соединения. Может быть указан прямо вurl
в виде опцииsecureConnectionCertificate
. -
auth.sakey.text
— можно указать содержимое ключа для аутентификации по ключу сервисного аккаунта. -
auth.sakey.file
— можно указать путь к файлу ключа для аутентификации по ключу сервисного аккаунта. Может быть указан прямо вurl
в виде опцииsaKeyFile
.
Опции автоматического создания таблиц
Совет
Если вам нужно настроить параметры таблицы, создайте её вручную заранее с помощью CREATE TABLE или измените их позже с помощью ALTER TABLE.
table.autocreate
— если указаноtrue
, то при записи в несуществующую таблицу она будет создана автоматически. По умолчанию включено.table.type
— тип автоматически создаваемой таблицы. Возможные варианты:row
— для создания строчной таблицы (по умолчанию);column
— для создания колоночной таблицы;
table.primary_keys
— разделённый запятой список колонок для использования в качестве первичного ключа. При отсутствии этой опции для ключа будет автоматически создана новая колонка.table.auto_pk_name
— имя колонки для автоматически создаваемого ключа. Эта колонка будет создана с типомUtf8
и будет заполняться случайными UUID v4 значениями. По умолчанию_spark_key
.
Пример работы с YDB в spark-shell и pyspark
В качестве примера покажем, как загрузить в YDB список всех постов StackOverflow за 2020 год. Эти данные доступны в виде Parquet-файла, расположенного по адресу https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/2020.parquet:
~ $ spark-shell --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.4
/_/
Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 17.0.15)
Type in expressions to have them evaluated.
Type :help for more information.
~ $ pyspark --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.4
/_/
Using Python version 3.10.12 (main, May 27 2025 17:12:29)
SparkSession available as 'spark'.
Выведем схему файла с данными и посмотрим количество строк в нём:
scala> val so_posts2020 = spark.read.format("parquet").load("/home/username/2020.parquet")
so_posts2020: org.apache.spark.sql.DataFrame = [Id: bigint, PostTypeId: bigint ... 20 more fields]
scala> so_posts2020.printSchema
root
|-- Id: long (nullable = true)
|-- PostTypeId: long (nullable = true)
|-- AcceptedAnswerId: long (nullable = true)
|-- CreationDate: timestamp (nullable = true)
|-- Score: long (nullable = true)
|-- ViewCount: long (nullable = true)
|-- Body: binary (nullable = true)
|-- OwnerUserId: long (nullable = true)
|-- OwnerDisplayName: binary (nullable = true)
|-- LastEditorUserId: long (nullable = true)
|-- LastEditorDisplayName: binary (nullable = true)
|-- LastEditDate: timestamp (nullable = true)
|-- LastActivityDate: timestamp (nullable = true)
|-- Title: binary (nullable = true)
|-- Tags: binary (nullable = true)
|-- AnswerCount: long (nullable = true)
|-- CommentCount: long (nullable = true)
|-- FavoriteCount: long (nullable = true)
|-- ContentLicense: binary (nullable = true)
|-- ParentId: binary (nullable = true)
|-- CommunityOwnedDate: timestamp (nullable = true)
|-- ClosedDate: timestamp (nullable = true)
scala> so_posts2020.count
res1: Long = 4304021
>>> so_posts2020 = spark.read.format("parquet").load("/home/username/2020.parquet")
>>> so_posts2020.printSchema()
root
|-- Id: long (nullable = true)
|-- PostTypeId: long (nullable = true)
|-- AcceptedAnswerId: long (nullable = true)
|-- CreationDate: timestamp (nullable = true)
|-- Score: long (nullable = true)
|-- ViewCount: long (nullable = true)
|-- Body: binary (nullable = true)
|-- OwnerUserId: long (nullable = true)
|-- OwnerDisplayName: binary (nullable = true)
|-- LastEditorUserId: long (nullable = true)
|-- LastEditorDisplayName: binary (nullable = true)
|-- LastEditDate: timestamp (nullable = true)
|-- LastActivityDate: timestamp (nullable = true)
|-- Title: binary (nullable = true)
|-- Tags: binary (nullable = true)
|-- AnswerCount: long (nullable = true)
|-- CommentCount: long (nullable = true)
|-- FavoriteCount: long (nullable = true)
|-- ContentLicense: binary (nullable = true)
|-- ParentId: binary (nullable = true)
|-- CommunityOwnedDate: timestamp (nullable = true)
|-- ClosedDate: timestamp (nullable = true)
>>> so_posts2020.count()
4304021
Добавим к данному DataFrame новую колонку с годом и запишем всё это в колоночную таблицу YDB:
scala> val my_ydb = Map("url" -> "grpcs://ydb.my-host.net:2135/preprod/spark-test?tokenFile=~/.token")
my_ydb: scala.collection.immutable.Map[String,String] = Map(url -> grpcs://ydb.my-host.net:2135/preprod/spark-test?tokenFile=~/.token)
scala> so_posts2020.withColumn("Year", lit(2020)).write.format("ydb").options(my_ydb).option("table.type", "column").option("table.primary_keys", "Id").mode("append").save("stackoverflow/posts");
>>> from pyspark.sql.functions import col,lit
>>> my_ydb = {"url": "grpcs://ydb.my-host.net:2135/preprod/spark-test?tokenFile=~/.token"}
>>> so_posts2020.withColumn("Year", lit(2020)).write.format("ydb").options(**my_ydb).option("table.type", "column").option("table.primary_keys", "Id").mode("append").save("stackoverflow/posts")
В итоге мы можем прочитать записанные данные из YDB и, например, подсчитать количество постов c подтверждённым ответом:
scala> val ydb_posts2020 = spark.read.format("ydb").options(my_ydb).load("stackoverflow/posts")
ydb_posts2020: org.apache.spark.sql.DataFrame = [Id: bigint, PostTypeId: bigint ... 21 more fields]
scala> ydb_posts2020.printSchema
root
|-- Id: long (nullable = false)
|-- PostTypeId: long (nullable = true)
|-- AcceptedAnswerId: long (nullable = true)
|-- CreationDate: timestamp (nullable = true)
|-- Score: long (nullable = true)
|-- ViewCount: long (nullable = true)
|-- Body: binary (nullable = true)
|-- OwnerUserId: long (nullable = true)
|-- OwnerDisplayName: binary (nullable = true)
|-- LastEditorUserId: long (nullable = true)
|-- LastEditorDisplayName: binary (nullable = true)
|-- LastEditDate: timestamp (nullable = true)
|-- LastActivityDate: timestamp (nullable = true)
|-- Title: binary (nullable = true)
|-- Tags: binary (nullable = true)
|-- AnswerCount: long (nullable = true)
|-- CommentCount: long (nullable = true)
|-- FavoriteCount: long (nullable = true)
|-- ContentLicense: binary (nullable = true)
|-- ParentId: binary (nullable = true)
|-- CommunityOwnedDate: timestamp (nullable = true)
|-- ClosedDate: timestamp (nullable = true)
|-- Year: integer (nullable = true)
scala> ydb_posts2020.count
res3: Long = 4304021
scala> ydb_posts2020.filter(col("AcceptedAnswerId") > 0).count
res4: Long = 843780
>>> ydb_posts2020 = spark.read.format("ydb").options(**my_ydb).load("stackoverflow/posts")
>>> ydb_posts2020.printSchema()
root
|-- Id: long (nullable = true)
|-- PostTypeId: long (nullable = true)
|-- AcceptedAnswerId: long (nullable = true)
|-- CreationDate: timestamp (nullable = true)
|-- Score: long (nullable = true)
|-- ViewCount: long (nullable = true)
|-- Body: binary (nullable = true)
|-- OwnerUserId: long (nullable = true)
|-- OwnerDisplayName: binary (nullable = true)
|-- LastEditorUserId: long (nullable = true)
|-- LastEditorDisplayName: binary (nullable = true)
|-- LastEditDate: timestamp (nullable = true)
|-- LastActivityDate: timestamp (nullable = true)
|-- Title: binary (nullable = true)
|-- Tags: binary (nullable = true)
|-- AnswerCount: long (nullable = true)
|-- CommentCount: long (nullable = true)
|-- FavoriteCount: long (nullable = true)
|-- ContentLicense: binary (nullable = true)
|-- ParentId: binary (nullable = true)
|-- CommunityOwnedDate: timestamp (nullable = true)
|-- ClosedDate: timestamp (nullable = true)
|-- Year: integer (nullable = true)
>>> ydb_posts2020.count()
4304021
>>> ydb_posts2020.filter(col("AcceptedAnswerId") > 0).count()
843780
Пример работы с YDB в spark-sql
В качестве примера покажем, как загрузить в YDB список всех постов StackOverflow за 2020 год. Эти данные доступны в виде Parquet-файла, расположенного по адресу https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/2020.parquet:
Для начала запустим spark-sql
с настроенным каталогом my_ydb
:
~ $ spark-sql --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 \
--conf spark.sql.catalog.my_ydb=tech.ydb.spark.connector.YdbCatalog \
--conf spark.sql.catalog.my_ydb.url=grpcs://ydb.my-host.net:2135/preprod/spark-test \
--conf spark.sql.catalog.my_ydb.auth.token.file=~/.token \
--conf spark.executor.memory=4g
spark-sql (default)>
Просмотрим текущее содержимое подключенной базы данных и убедимся в отсутствии таблицы stackoverflow/posts
:
spark-sql (default)> SHOW NAMESPACES FROM my_ydb;
stackoverflow
Time taken: 0.11 seconds, Fetched 1 row(s)
spark-sql (default)> SHOW TABLES FROM my_ydb.stackoverflow;
Time taken: 0.041 seconds
Подсчитаем количество строк в оригинальном файле:
spark-sql (default)> SELECT COUNT(*) FROM parquet.`/home/username/2020.parquet`;
4304021
Добавим новую колонку с годом и запишем всё это в таблицу YDB:
spark-sql (default)> CREATE TABLE my_ydb.stackoverflow.posts OPTIONS(table.primary_keys='Id') AS SELECT *, 2020 as Year FROM parquet.`/home/username/2020.parquet`;
Time taken: 85.225 seconds
Убедимся, что новая таблица появилась в базе данных YDB:
spark-sql (default)> SHOW TABLES FROM my_ydb.stackoverflow;
posts
Time taken: 0.07 seconds, Fetched 1 row(s)
spark-sql (default)> DESCRIBE TABLE my_ydb.stackoverflow.posts;
Id bigint
PostTypeId bigint
AcceptedAnswerId bigint
CreationDate timestamp
Score bigint
ViewCount bigint
Body binary
OwnerUserId bigint
OwnerDisplayName binary
LastEditorUserId bigint
LastEditorDisplayName binary
LastEditDate timestamp
LastActivityDate timestamp
Title binary
Tags binary
AnswerCount bigint
CommentCount bigint
FavoriteCount bigint
ContentLicense binary
ParentId binary
CommunityOwnedDate timestamp
ClosedDate timestamp
Year int
В итоге мы можем прочитать записанные данные из YDB и, например, подсчитать количество постов с подтверждённым ответом:
spark-sql (default)> SELECT COUNT(*) FROM my_ydb.stackoverflow.posts;
4304021
Time taken: 19.726 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT COUNT(*) FROM my_ydb.stackoverflow.posts WHERE AcceptedAnswerId > 0;
843780
Time taken: 6.599 seconds, Fetched 1 row(s)