Импорт и экспорт данных в колоночные таблицы
В колоночных таблицах YDB в настоящий момент отсутствует (точнее находится в разработке) встроенный механизм резервного копирования и восстановления (backup/restore). Для миграции данных или их восстановления после сбоев рекомендуется использовать операции экспорта и импорта.
Доступны два основных подхода:
-
Экспорт и импорт через федеративные запросы к объектным хранилищам (например, Object Storage любое другое S3-совместимое хранилище).
Преимущества: Использование встроенного функционала YDB, отсутствие необходимости в сторонних сервисах или инструментах.
Ограничения: Данный метод позволяет экспортировать данные только в объектные хранилища. -
Экспорт и импорт через Apache Spark™ — гибкий способ для работы с большими объемами данных.
Преимущества: Поддержка широкого спектра целевых хранилищ
Ограничения: Требуется установка и настройка дополнительного ПО (Apache Spark™).
Экспорт и импорт через федеративные запросы к Object Storage
Федеративные запросы позволяют YDB напрямую читать и записывать данные в файлах форматов Parquet или CSV. Этот метод удобен для выполнения операций экспорта и импорта непосредственно средствами SQL без использования внешних инструментов.
Предварительные требования
- Объектное хранилище (Object Storage) и статический ключ доступа, например, Yandex Object Storage с заранее созданным бакетом (например,
your-bucket). - Настроенный сетевой доступ с узлов кластера YDB к объектному хранилищу. В примере используется endpoint
storage.yandexcloud.net— необходимо обеспечить доступ к нему по порту 443. - В примерах используются данные теста производительности TPC-H. Инструкция по загрузке тестовых данных доступна в соответствующем разделе руководства.
Создайте секрет для доступа к Object Storage
Для подключения к приватному бакету необходимо использовать аутентификацию по статическим ключам доступа. В YDB эти ключи хранятся в виде SECRET объектов.
CREATE OBJECT aws_access_id (TYPE SECRET) WITH (value='<ID_ключа>');
CREATE OBJECT aws_access_key (TYPE SECRET) WITH (value='<секретный_ключ>');
Где:
aws_access_id— имя секрета, содержащего ID_ключа.<ID_ключа>— идентификатор статического ключа доступа.aws_access_key— имя секрета, содержащего секретный ключ.<секретный_ключ>— секретная часть ключа доступа.- LOCATION — endpoint выбранного Object Storage
- AWS_REGION — регион выбранного Object Storage
Настройка подключения
Далее необходимо настроить подключение к бакету, создав внешний источник данных и внешнюю таблицу со схемой, идентичной lineitem.
-- Создание источника данных, указывающего на бакет и использующего секрет
CREATE EXTERNAL DATA SOURCE `external/backup_datasource` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="https://storage.yandexcloud.net/<bucket_name>/",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="aws_access_id",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="aws_access_key",
AWS_REGION="ru-central1"
);
Где:
external/backup_datasource— имя создаваемого внешнего источника данных.LOCATION— URL бакета, включая название бакета<bucket_name>AUTH_METHOD="AWS"— метод аутентификации, совместимый с S3 API.AWS_ACCESS_KEY_ID_SECRET_NAME,AWS_SECRET_ACCESS_KEY_SECRET_NAME— имена секретов, используемых для аутентификации в Object Storage.
-- Создание внешней таблицы со схемой lineitem
CREATE EXTERNAL TABLE `external/backup/lineitem_sql` (
l_orderkey Int64 NOT NULL,
l_partkey Int32 NOT NULL,
l_suppkey Int32 NOT NULL,
l_linenumber Int32 NOT NULL,
l_quantity Double NOT NULL,
l_extendedprice Double NOT NULL,
l_discount Double NOT NULL,
l_tax Double NOT NULL,
l_returnflag String NOT NULL,
l_linestatus String NOT NULL,
l_shipdate Date NOT NULL,
l_commitdate Date NOT NULL,
l_receiptdate Date NOT NULL,
l_shipinstruct String NOT NULL,
l_shipmode String NOT NULL,
l_comment String NOT NULL
) WITH (
DATA_SOURCE="external/backup_datasource",
LOCATION="/ydb-dumps-sql/lineitem/",
FORMAT="parquet"
);
Где:
LOCATION— путь к директории с данными внутри бакета.DATA_SOURCE— название объектаEXTERNAL DATA SOURCE, содержащего параметры подключения.external/backup/lineitem_sql— полное имя создаваемой внешней таблицы.
Экспорт данных из YDB
Для экспорта данных из таблицы tpch/s10/lineitem в Object Storage используется INSERT INTO ... SELECT во внешнюю таблицу.
INSERT INTO `external/backup/lineitem_sql`
SELECT * FROM `tpch/s10/lineitem`;
После выполнения этого запроса в бакете your-bucket по пути /ydb-dumps-sql/lineitem/ появятся Parquet-файлы с данными.
Импорт данных в YDB
Примечание
Команда INSERT может завершиться с ошибкой, если таблица, куда вы восстанавливаете данные, уже содержит записи. В этом случае нужно очистить целевую таблицу и повторить команду INSERT.
Для импорта данных из Object Storage в обратно в таблицу tpch/s10/lineitem используется INSERT INTO ... SELECT из внешней таблицы.
INSERT INTO `tpch/s10/lineitem`
SELECT * FROM `external/backup/lineitem_sql`;
Здесь tpch/s10/lineitem — это имя целевой таблицы в YDB, в которую будут загружены данные.
Экспорт и импорт с помощью Apache Spark™
Использование коннектора YDB для Apache Spark™ является гибким и масштабируемым решением для экспорта и импорта больших объемов данных.
Предварительные требования
- Установленный PySpark версии 4.0.1, установить можно по инструкции.
- Наличие gRPC-эндпоинта для подключения к базе данных YDB.
- Реквизиты доступа YDB с правами на чтение/запись.
- Настроенный сетевой доступ с узлов кластера YDB к объектному хранилищу. В примере используется endpoint
storage.yandexcloud.net— необходимо обеспечить доступ к нему по порту 443. - В примерах используются данные теста производительности TPC-H. Инструкция по загрузке тестовых данных доступна в соответствующем разделе руководства.
Экспорт данных из YDB в Parquet
Используемые параметры:
spark.jars.packages— конфигурационный параметр Maven, который загрузит коннектор YDB для Spark, а также другие необходимые компоненты.S3_ENDPOINT— эндпоинт S3-совместимого хранилища (для Yandex Object Storage используйтеhttps://storage.yandexcloud.net).S3_ACCESS_KEY— ID статического ключа для доступа к S3.S3_SECRET_KEY— секретная часть ключа для доступа к S3.YDB_HOSTNAME— хост gRPC-эндпоинта (например,ydb.serverless.yandexcloud.net).YDB_PORT— порт gRPC-эндпоинта (например,2135).YDB_DATABASE_NAME— путь к вашей базе данных (например,/ru-central1/b1g.../etn...).YDB_AUTH_TYPE— параметры для аутентификации в YDB, поддерживаемые драйвером Apache Spark.YDB_SOURCE_TABLE— Путь к исходной таблице в базе источнике (например,tpch/s1/lineitem).
from pyspark.sql import SparkSession
#Настройки источника
YDB_HOSTNAME = ""
YDB_PORT = ""
YDB_DATABASE_NAME = ""
YDB_AUTH_TYPE = ""
YDB_SOURCE_TABLE = ""
#Настройки назначения
S3_ENDPOINT = ""
S3_ACCESS_KEY = ""
S3_SECRET_KEY = ""
S3_BUCKET_NAME = ""
spark = (SparkSession.builder
.appName("ydb-export-lineitem-to-parquet")
.config("spark.jars.packages", "tech.ydb.spark:ydb-spark-connector-shaded:2.0.1,org.apache.hadoop:hadoop-aws:3.3.6,com.amazonaws:aws-java-sdk-bundle:1.12.662")
# Конфигурация S3-коннектора
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT)
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
.config("spark.hadoop.fs.s3a.threads.keepalivetime", "60000")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.hadoop.fs.s3a.connection.establish.timeout", "30000") # 30s
.config("spark.hadoop.fs.s3a.connection.timeout", "200000") # 200s
.config("spark.hadoop.fs.s3a.threads.keepalivetime", "60000") # 60s
.config("spark.hadoop.fs.s3a.connection.ttl", "300000") # 5m
.config("spark.hadoop.fs.s3a.assumed.role.session.duration", "1800000")# 30m
.config("spark.hadoop.fs.s3a.multipart.purge.age", "86400000") # 24h
.config("spark.hadoop.fs.s3a.retry.interval", "500") # 500ms
.config("spark.hadoop.fs.s3a.retry.throttle.interval", "100") # 100ms
.getOrCreate())
# Чтение данных из таблицы lineitem
df = (spark.read.format("ydb")
.option("url", f"grpcs://{YDB_HOSTNAME}:{YDB_PORT}{YDB_DATABASE_NAME}?{YDB_AUTH_TYPE}")
.load(YDB_SOURCE_TABLE))
# Запись данных в Parquet-файлы в S3
(df.repartition(64)
.write.mode("overwrite")
.option("compression", "snappy")
.parquet(f"s3a://{S3_BUCKET_NAME}/ydb-dumps-spark/lineitem/"))
spark.stop()
Импорт данных из Parquet в YDB
spark.jars.packages— конфигурационный параметр Maven, который загрузит коннектор YDB для Spark, а также другие необходимые компоненты.S3_ENDPOINT— эндпоинт S3-совместимого хранилища (для Yandex Object Storage используйтеhttps://storage.yandexcloud.net).S3_ACCESS_KEY— ID статического ключа для доступа к S3.S3_SECRET_KEY— секретная часть ключа для доступа к S3.YDB_HOSTNAME— хост gRPC-эндпоинта (например,ydb.serverless.yandexcloud.net).YDB_PORT— порт gRPC-эндпоинта (например,2135).YDB_DATABASE_NAME— путь к вашей базе данных (например,/ru-central1/b1g.../etn...).YDB_AUTH_TYPE— параметры для аутентификации в YDB, поддерживаемые драйвером Apache Spark.YDB_TARGET_TABLE— Путь к таблице в базе назначения (например,tpch/s1/lineitem).
from pyspark.sql import SparkSession
#Настройки источника
S3_ENDPOINT = "https://storage.yandexcloud.net"
S3_ACCESS_KEY = ""
S3_SECRET_KEY = ""
S3_BUCKET_NAME = ""
S3_FOLDER_PATH = ""
#Настройки назначения
YDB_HOSTNAME = ""
YDB_PORT = ""
YDB_DATABASE_NAME = ""
YDB_AUTH_TYPE = ""
YDB_TARGET_TABLE = ""
spark = (SparkSession.builder
.appName("ydb-import-lineitem-from-parquet")
.config("spark.jars.packages", "tech.ydb.spark:ydb-spark-connector-shaded:2.0.1,org.apache.hadoop:hadoop-aws:3.3.6,com.amazonaws:aws-java-sdk-bundle:1.12.662")
# Конфигурация S3-коннектора (аналогично экспорту)
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT)
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
.config("spark.hadoop.fs.s3a.threads.keepalivetime", "60000")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.hadoop.fs.s3a.connection.establish.timeout", "30000") # 30s
.config("spark.hadoop.fs.s3a.connection.timeout", "200000") # 200s
.config("spark.hadoop.fs.s3a.threads.keepalivetime", "60000") # 60s
.config("spark.hadoop.fs.s3a.connection.ttl", "300000") # 5m
.config("spark.hadoop.fs.s3a.assumed.role.session.duration", "1800000")# 30m
.config("spark.hadoop.fs.s3a.multipart.purge.age", "86400000") # 24h
.config("spark.hadoop.fs.s3a.retry.interval", "500") # 500ms
.config("spark.hadoop.fs.s3a.retry.throttle.interval", "100") # 100ms
.getOrCreate())
# Чтение данных из Parquet-файлов в S3, созданных на шаге экспорта
df = spark.read.parquet(f"s3a://{S3_BUCKET_NAME}/{S3_FOLDER_PATH}")
# Запись данных в принимающую таблицу
(df.write.format("ydb")
.option("url", f"grpcs://{YDB_HOSTNAME}:{YDB_PORT}{YDB_DATABASE_NAME}?{YDB_AUTH_TYPE}")
.mode("append")
.save(YDB_TARGET_TABLE))
spark.stop()