Apache Spark™
Apache Spark™ is an open-source, distributed processing system used for big data workloads. It utilizes in-memory caching and optimized query execution for fast analytic queries against data of any size. It provides development APIs in Java, Scala, Python, and R, and supports code reuse across multiple workloads—batch processing, interactive queries, real-time analytics, machine learning, and graph processing. Apache Spark™ can work with YDB using the YDB Spark Connector, a special module that implements core Apache Spark™ primitives. It supports:
- Distribution of operations across YDB table partitions
- Scalable YDB table readings and writing
- Automatic creation of tables if they do not exist
Note
The connector may require additional memory on the Apache Spark™ executor side to work with better speed and performance. 4 GB or more memory per executor is highly recommended.
How to Use
To work with YDB in Apache Spark™, you need to add the YDB Spark Connector to your Apache Spark™ driver. This can be done in several ways:
-
Download the connector dependency directly from Maven Central using the
--packages
option. It's recommended to use the latest published version:Spark ShellPySparkSpark SQL~ $ spark-shell --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
~ $ pyspark --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
~ $ spark-sql --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
-
Download the latest version of the shaded connector (a connector build that includes all dependencies) from GitHub or Maven Central and specify the downloaded artifact in the
--jars
option:Spark ShellPySparkSpark SQL~ $ spark-shell --master <master-url> --jars ~/Download/ydb-spark-connector-shaded-2.0.1.jar --conf spark.executor.memory=4g
~ $ pyspark --master <master-url> --jars ~/Download/ydb-spark-connector-shaded-2.0.1.jar --conf spark.executor.memory=4g
~ $ spark-sql --master <master-url> --jars ~/Download/ydb-spark-connector-shaded-2.0.1.jar --conf spark.executor.memory=4g
-
You can also copy the downloaded shaded artifact to the
jars
folder of your Apache Spark™ distribution. In this case, no additional options need to be specified.
Use DataFrame API
The DataFrame API allows you to work with YDB in an interactive spark-shell
or pyspark
session, as well as when writing code in Java
, Scala
, or Python
for spark-submit
.
To create a DataFrame
, you need to specify the ydb
format, pass a set of connection options and the path to the YDB table:
val ydb_df = spark.read.format("ydb").options(<options>).load("<table-path>")
ydb_df = spark.read.format("ydb").options(<options>).load("<table-path>")
To save any DataFrame
in the table YDB, you similarly need to specify the ydb
format, connection options and the path to the table:
any_dataframe.write.format("ydb").options(<options>).mode("append").save("<table-path>")
any_dataframe.write.format("ydb").options(<options>).mode("append").save("<table-path>")
Note
For writing data to YDB it is recommended to use the append
mode, which uses batch data loading. If the table specified in the save()
method does not exist, it will be created automatically according to the table autocreation options.
A more detailed example is provided in the Spark-shell example.
Use Catalog API
Catalogs allow you to work with YDB in interactive spark-sql
sessions or execute SQL queries via the spark.sql
method.
To access YDB, you need to add a catalog by specifying the following Apache Spark™ properties. You can define multiple catalogs with different names to access to different YDB databases:
# Mandatory catalog's driver name
spark.sql.catalog.<catalog_name>=tech.ydb.spark.connector.YdbCatalog
# Mandatory option, the url of database
spark.sql.catalog.<catalog_name>.url=<ydb-connection-url>
# Other options are not mandatory and may be specified as necessary
spark.sql.catalog.<catalog_name>.<param-name>=<param-value>
After that, you can work with YDB tables through standard Apache Spark™ SQL queries.
Note that you should use a dot .
as a separator in the table path.
SELECT * FROM <catalog_name>.<table-path> LIMIT 10;
A more detailed example is provided in the Spark SQL example.
YDB Spark Connector Options
The behavior of the YDB Spark Connector is configured using options that can be passed as one set with the options
method or specified individually with the option
method. Each DataFrame
and each individual operation on a DataFrame
can have its own configuration of options.
Connection Options
-
url
— a required parameter with the YDB connection string in the following format:
grpc[s]://<endpoint>:<port>/<database>[?<options>]
Examples:- Local Docker container with anonymous authentication and without TLS:
grpc://localhost:2136/local
- Remote self-hosted cluster:
grpcs://my-private-cluster:2135/Root/my-database?secureConnectionCertificate=~/myCertificate.cer
- Cloud database instance with a token:
grpcs://ydb.my-cloud.com:2135/my_folder/test_database?tokenFile=~/my_token
- Cloud database instance with a service account key:
grpcs://ydb.my-cloud.com:2135/my_folder/test_database?saKeyFile=~/sa_key.json
- Local Docker container with anonymous authentication and without TLS:
-
auth.use_env
— if set totrue
, authentication based on environment variables will be used. -
auth.use_metadata
— if set totrue
, metadata-based authentication mode will be used. You can specify it directly inurl
as theuseMetadata
option. -
auth.login
andauth.password
— login and password for static authentication. -
auth.token
— authentication using the specified Access Token. -
auth.token.file
— authentication using Access Token from the specified file. You can specify it directly inurl
as thetokenFile
option. -
auth.ca.text
— specifies the certificate value for establishing a TLS connection. -
auth.ca.file
— specifies the path to the certificate for establishing a TLS connection. You can specify it directly inurl
as thesecureConnectionCertificate
option. -
auth.sakey.text
— used to specify the key content for authentication with a service account key. -
auth.sakey.file
— used to specify the path to the key file for authentication with a service account key. You can specify it directly inurl
as thesaKeyFile
option.
Table Autocreation Options
Tip
If you need the table to have some custom settings configured, create it manually beforehand with CREATE TABLE or modify it afterward with ALTER TABLE.
table.autocreate
— if set totrue
, then when writing to a non-existent table, it will be created automatically. Enabled by default;table.type
— the type of automatically created table. Possible values:row
for creating a row-oriented table (default).column
for creating a column-oriented table.
table.primary_keys
— a comma-separated list of columns to use as the primary key. If this option is not provided, a new column with random content will be used for the key.table.auto_pk_name
— the name of the column for the randomly created key. This column will be created with the typeUtf8
and will be filled with random UUID v4 values. Default value is_spark_key
.
Spark Shell and PySpark Example
As an example, we'll show how to load a list of all Stack Overflow posts from 2020 into YDB. This data can be downloaded from the following link: https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/2020.parquet
~ $ spark-shell --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.5.4
/_/
Using Scala version 2.12.18 (OpenJDK 64-Bit Server VM, Java 17.0.15)
Type in expressions to have them evaluated.
Type :help for more information.
~ $ pyspark --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 --conf spark.executor.memory=4g
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.4
/_/
Using Python version 3.10.12 (main, May 27 2025 17:12:29)
SparkSession available as 'spark'.
Let's display the schema of the Parquet file and count the number of rows it contains:
scala> val so_posts2020 = spark.read.format("parquet").load("/home/username/2020.parquet")
so_posts2020: org.apache.spark.sql.DataFrame = [Id: bigint, PostTypeId: bigint ... 20 more fields]
scala> so_posts2020.printSchema
root
|-- Id: long (nullable = true)
|-- PostTypeId: long (nullable = true)
|-- AcceptedAnswerId: long (nullable = true)
|-- CreationDate: timestamp (nullable = true)
|-- Score: long (nullable = true)
|-- ViewCount: long (nullable = true)
|-- Body: binary (nullable = true)
|-- OwnerUserId: long (nullable = true)
|-- OwnerDisplayName: binary (nullable = true)
|-- LastEditorUserId: long (nullable = true)
|-- LastEditorDisplayName: binary (nullable = true)
|-- LastEditDate: timestamp (nullable = true)
|-- LastActivityDate: timestamp (nullable = true)
|-- Title: binary (nullable = true)
|-- Tags: binary (nullable = true)
|-- AnswerCount: long (nullable = true)
|-- CommentCount: long (nullable = true)
|-- FavoriteCount: long (nullable = true)
|-- ContentLicense: binary (nullable = true)
|-- ParentId: binary (nullable = true)
|-- CommunityOwnedDate: timestamp (nullable = true)
|-- ClosedDate: timestamp (nullable = true)
scala> so_posts2020.count
res1: Long = 4304021
>>> so_posts2020 = spark.read.format("parquet").load("/home/username/2020.parquet")
>>> so_posts2020.printSchema()
root
|-- Id: long (nullable = true)
|-- PostTypeId: long (nullable = true)
|-- AcceptedAnswerId: long (nullable = true)
|-- CreationDate: timestamp (nullable = true)
|-- Score: long (nullable = true)
|-- ViewCount: long (nullable = true)
|-- Body: binary (nullable = true)
|-- OwnerUserId: long (nullable = true)
|-- OwnerDisplayName: binary (nullable = true)
|-- LastEditorUserId: long (nullable = true)
|-- LastEditorDisplayName: binary (nullable = true)
|-- LastEditDate: timestamp (nullable = true)
|-- LastActivityDate: timestamp (nullable = true)
|-- Title: binary (nullable = true)
|-- Tags: binary (nullable = true)
|-- AnswerCount: long (nullable = true)
|-- CommentCount: long (nullable = true)
|-- FavoriteCount: long (nullable = true)
|-- ContentLicense: binary (nullable = true)
|-- ParentId: binary (nullable = true)
|-- CommunityOwnedDate: timestamp (nullable = true)
|-- ClosedDate: timestamp (nullable = true)
>>> so_posts2020.count()
4304021
Then add a new column with the year to this DataFrame and store it all to a column-oriented YDB table:
scala> val my_ydb = Map("url" -> "grpcs://ydb.my-host.net:2135/preprod/spark-test?tokenFile=~/.token")
my_ydb: scala.collection.immutable.Map[String,String] = Map(url -> grpcs://ydb.my-host.net:2135/preprod/spark-test?tokenFile=~/.token)
scala> so_posts2020.withColumn("Year", lit(2020)).write.format("ydb").options(my_ydb).option("table.type", "column").option("table.primary_keys", "Id").mode("append").save("stackoverflow/posts");
>>> from pyspark.sql.functions import col,lit
>>> my_ydb = {"url": "grpcs://ydb.my-host.net:2135/preprod/spark-test?tokenFile=~/.token"}
>>> so_posts2020.withColumn("Year", lit(2020)).write.format("ydb").options(**my_ydb).option("table.type", "column").option("table.primary_keys", "Id").mode("append").save("stackoverflow/posts")
As a result, you can read the stored data from the YDB table and, for example, count the number of posts that have an accepted answer:
scala> val ydb_posts2020 = spark.read.format("ydb").options(my_ydb).load("stackoverflow/posts")
ydb_posts2020: org.apache.spark.sql.DataFrame = [Id: bigint, PostTypeId: bigint ... 21 more fields]
scala> ydb_posts2020.printSchema
root
|-- Id: long (nullable = false)
|-- PostTypeId: long (nullable = true)
|-- AcceptedAnswerId: long (nullable = true)
|-- CreationDate: timestamp (nullable = true)
|-- Score: long (nullable = true)
|-- ViewCount: long (nullable = true)
|-- Body: binary (nullable = true)
|-- OwnerUserId: long (nullable = true)
|-- OwnerDisplayName: binary (nullable = true)
|-- LastEditorUserId: long (nullable = true)
|-- LastEditorDisplayName: binary (nullable = true)
|-- LastEditDate: timestamp (nullable = true)
|-- LastActivityDate: timestamp (nullable = true)
|-- Title: binary (nullable = true)
|-- Tags: binary (nullable = true)
|-- AnswerCount: long (nullable = true)
|-- CommentCount: long (nullable = true)
|-- FavoriteCount: long (nullable = true)
|-- ContentLicense: binary (nullable = true)
|-- ParentId: binary (nullable = true)
|-- CommunityOwnedDate: timestamp (nullable = true)
|-- ClosedDate: timestamp (nullable = true)
|-- Year: integer (nullable = true)
scala> ydb_posts2020.count
res3: Long = 4304021
scala> ydb_posts2020.filter(col("AcceptedAnswerId") > 0).count
res4: Long = 843780
>>> ydb_posts2020 = spark.read.format("ydb").options(**my_ydb).load("stackoverflow/posts")
>>> ydb_posts2020.printSchema()
root
|-- Id: long (nullable = true)
|-- PostTypeId: long (nullable = true)
|-- AcceptedAnswerId: long (nullable = true)
|-- CreationDate: timestamp (nullable = true)
|-- Score: long (nullable = true)
|-- ViewCount: long (nullable = true)
|-- Body: binary (nullable = true)
|-- OwnerUserId: long (nullable = true)
|-- OwnerDisplayName: binary (nullable = true)
|-- LastEditorUserId: long (nullable = true)
|-- LastEditorDisplayName: binary (nullable = true)
|-- LastEditDate: timestamp (nullable = true)
|-- LastActivityDate: timestamp (nullable = true)
|-- Title: binary (nullable = true)
|-- Tags: binary (nullable = true)
|-- AnswerCount: long (nullable = true)
|-- CommentCount: long (nullable = true)
|-- FavoriteCount: long (nullable = true)
|-- ContentLicense: binary (nullable = true)
|-- ParentId: binary (nullable = true)
|-- CommunityOwnedDate: timestamp (nullable = true)
|-- ClosedDate: timestamp (nullable = true)
|-- Year: integer (nullable = true)
>>> ydb_posts2020.count()
4304021
>>> ydb_posts2020.filter(col("AcceptedAnswerId") > 0).count()
843780
Spark SQL example
As an example, we'll show how to load a list of all Stack Overflow posts from 2020 into YDB. This data can be downloaded from the following link: https://datasets-documentation.s3.eu-west-3.amazonaws.com/stackoverflow/parquet/posts/2020.parquet
First, let's run spark-sql
with the configured my_ydb
catalog:
~ $ spark-sql --master <master-url> --packages tech.ydb.spark:ydb-spark-connector-shaded:2.0.1 \
--conf spark.sql.catalog.my_ydb=tech.ydb.spark.connector.YdbCatalog \
--conf spark.sql.catalog.my_ydb.url=grpcs://ydb.my-host.net:2135/preprod/spark-test \
--conf spark.sql.catalog.my_ydb.auth.token.file=~/.token \
--conf spark.executor.memory=4g
spark-sql (default)>
Let's validate the current state of the connected database and confirm the absence of the stackoverflow/posts
table:
spark-sql (default)> SHOW NAMESPACES FROM my_ydb;
stackoverflow
Time taken: 0.11 seconds, Fetched 1 row(s)
spark-sql (default)> SHOW TABLES FROM my_ydb.stackoverflow;
Time taken: 0.041 seconds
Let's count the number of rows in the original parquet file:
spark-sql (default)> SELECT COUNT(*) FROM parquet.`/home/username/2020.parquet`;
4304021
Let's add a new column with the year and copy it all to a new YDB table:
spark-sql (default)> CREATE TABLE my_ydb.stackoverflow.posts OPTIONS(table.primary_keys='Id') AS SELECT *, 2020 as Year FROM parquet.`/home/username/2020.parquet`;
Time taken: 85.225 seconds
Let's verify that the new table has appeared in the YDB database:
spark-sql (default)> SHOW TABLES FROM my_ydb.stackoverflow;
posts
Time taken: 0.07 seconds, Fetched 1 row(s)
spark-sql (default)> DESCRIBE TABLE my_ydb.stackoverflow.posts;
Id bigint
PostTypeId bigint
AcceptedAnswerId bigint
CreationDate timestamp
Score bigint
ViewCount bigint
Body binary
OwnerUserId bigint
OwnerDisplayName binary
LastEditorUserId bigint
LastEditorDisplayName binary
LastEditDate timestamp
LastActivityDate timestamp
Title binary
Tags binary
AnswerCount bigint
CommentCount bigint
FavoriteCount bigint
ContentLicense binary
ParentId binary
CommunityOwnedDate timestamp
ClosedDate timestamp
Year int
As a result, we can read the stored data from YDB table and, for example, count the number of posts that have an accepted answer:
spark-sql (default)> SELECT COUNT(*) FROM my_ydb.stackoverflow.posts;
4304021
Time taken: 19.726 seconds, Fetched 1 row(s)
spark-sql (default)> SELECT COUNT(*) FROM my_ydb.stackoverflow.posts WHERE AcceptedAnswerId > 0;
843780
Time taken: 6.599 seconds, Fetched 1 row(s)