Apache Airflow™
Интеграция YDB с Apache Airflow™ позволяет автоматизировать сложные рабочие процессы и управлять ими. Apache Airflow™ предоставляет возможности для планирования задач, мониторинга их выполнения и управления зависимостями между ними — оркестрацией. Использование Airflow для оркестрации задач, таких как загрузка данных в YDB, выполнение запросов и управление транзакциями, позволяет автоматизировать и оптимизировать операционные процессы. Это особенно важно для задач ETL, где данные больших объемов требуют регулярного извлечения, преобразования и загрузки.
Для работы под управлением Apache Airflow™ YDB предоставляет пакет провайдера apache-airflow-providers-ydb. Задания Apache Airflow™ представляют собой приложения на языке Python, состоящие из набора операторов Apache Airflow™ и их зависимостей, определяющих порядок выполнения.
Установка
Для корректной работы пакета apache-airflow-providers-ydb
необходимо на всех хостах Apache Airflow™ выполнить следующие команды:
pip install ydb apache-airflow-providers-ydb
Для работы требуется Python версии не ниже, чем 3.8.
Объектная модель
Пакет airflow.providers.ydb
содержит набор компонентов для взаимодействия с YDB:
- Оператор YDBExecuteQueryOperator для интеграции задач в планировщик Apache Airflow™.
- Хук YDBHook для прямого взаимодействия с YDB.
YDBExecuteQueryOperator
Для выполнения запросов к YDB используется Apache Airflow™ оператор YDBExecuteQueryOperator
.
Обязательные аргументы:
task_id
— название задания Apache Airflow™.sql
— текст SQL-запроса, который необходимо выполнить в YDB.
Опциональные аргументы:
ydb_conn_id
— идентификатор подключения с типомYDB
, содержащий параметры соединения с YDB. Если не указан, то используется соединение с именемydb_default
. Соединениеydb_default
предустанавливается в составе Apache Airflow™, отдельно его заводить не нужно.is_ddl
— признак, что выполняется SQL DDL запрос. Если аргумент не указан, или установлен вFalse
, то будет выполняться SQL DML запрос.params
— словарь параметров запроса.
Пример:
ydb_operator = YDBExecuteQueryOperator(task_id="ydb_operator", sql="SELECT 'Hello, world!'")
В данном примере создается задание Apache Airflow™ с идентификатором ydb_operator
, которое выполняет запрос SELECT 'Hello, world!'
.
YDBHook
Для выполнения низкоуровневых команд в YDB используется Apache Airflow™ класс YDBHook
.
Опциональные аргументы:
ydb_conn_id
— идентификатор подключения с типомYDB
, содержащий параметры соединения с YDB. Если не указан, то используется соединение с именемydb_default
. Соединениеydb_default
предустанавливается в составе Apache Airflow™, отдельно его заводить не нужно.is_ddl
— признак, что выполняется SQL DDL запрос. Если аргумент не указан, или установлен вFalse
, то будет выполняться SQL DML запрос.
YDBHook
поддерживает следующие методы:
bulk_upsert
Выполняет пакетную вставку данных в таблицы YDB.
Обязательные аргументы:
table_name
— название таблицы YDB, куда будет выполняться вставка данных.rows
— массив строк для вставки.column_types
— описание типов колонок.
Пример:
hook = YDBHook(ydb_conn_id=...)
column_types = (
ydb.BulkUpsertColumns()
.add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
.add_column("name", ydb.PrimitiveType.Utf8)
.add_column("pet_type", ydb.PrimitiveType.Utf8)
.add_column("birth_date", ydb.PrimitiveType.Utf8)
.add_column("owner", ydb.PrimitiveType.Utf8)
)
rows = [
{"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
{"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)
В данном примере создается объект YDBHook
, через который выполняется операция пакетной вставки данных bulk_upsert
.
get_conn
Возвращает объект YDBConnection
, реализующий интерфейс DbApiConnection
для работы с данными. Класс DbApiConnection
обеспечивает стандартизированный интерфейс для взаимодействия с базой данных, позволяющий выполнять такие операции, как подключение, выполнение SQL-запросов и управление транзакциями, независимо от конкретной системы управления базами данных.
Пример:
hook = YDBHook(ydb_conn_id=...)
# Выполняем SQL-запрос и получаем курсор
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute("SELECT * from pet;")
# Извлекаем результат и имена колонок
result = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Закрываем курсор и соединение
cursor.close()
connection.close()
В данном примере создается объект YDBHook
, у созданного объекта запрашивается объект YDBConnection
, через который выполняется чтение данных и получение списка колонок.
Подключение к YDB
Для подключения к YDB необходимо создать новое или отредактировать существующее подключение Apache Airflow™ с типом YDB
.
Где:
Connection Id
- название подключения Apache Airflow™.Host
- протокол и адрес кластера YDB.Port
- порт для подключения к кластеру YDB.Database name
- название базы данных YDB.
Укажите реквизиты для одного из следующих способов аутентификации на кластере YDB:
Login
иPassword
- укажите реквизиты пользователя для аутентификации по логину и паролю.Service account auth JSON
- укажите значениеService Account Key
.Service account auth JSON file path
- укажите путь к файлу, содержащемуService Account Key
.IAM token
- укажите IAM токен.Use VM metadata
- указание использовать метаданные виртуальной машины.
Соответствие YQL и Python-типов
Ниже приведены правила преобразования YQL-типов в Python-результаты. Типы, не указанные в списке ниже, не поддерживаются.
Скалярные типы
YQL-тип | Python-тип | Пример в Python |
---|---|---|
Int8 , Int16 , Int32 , Uint8 , Uint16 , Uint32 , Int64 , Uint64 |
int |
647713 |
Bool |
bool |
True |
Float , float |
float NaN и Inf представляются в виде None |
7.88731023 None |
Decimal |
Decimal |
45.23410083 |
Utf8 |
str |
Текст строки |
String |
str |
Текст строки |
Сложные типы
YQL-тип | Python-тип | Пример в Python |
---|---|---|
Json , JsonDocument |
str (весь узел вставляется как строка) |
{"a":[1,2,3]} |
Date |
datetime.date |
2022-02-09 |
Datetime , Timestamp |
datetime.datetime |
2022-02-09 10:13:11 |
Опциональные типы
YQL-тип | Python-тип | Пример в Python |
---|---|---|
Optional |
Оригинальный тип или None | 1 |
Контейнеры
YQL-тип | Python-тип | Пример в Python |
---|---|---|
List<Type> |
list |
[1,2,3,4] |
Dict<KeyType, ValueType> |
dict |
{key1: "value1", key2: "value2"} |
Set<KeyType> |
set |
set(key_value1, key_value2) |
Tuple<Type1, Type2> |
tuple |
(element1, element2) |
Struct<Name:Utf8,Age:Int32> |
dict |
{ "Name": "value1", "Age": value2 } |
Специальные типы
YQL-тип | Python-тип |
---|---|
Void , Null |
None |
EmptyList |
[] |
EmptyDict |
{} |
Пример
Для выполнения запросов к YDB в составе пакета содержится оператор Apache Airflow™ YDBExecuteQueryOperator
и хук YDBHook
.
В примере ниже создается задание create_pet_table
, создающее таблицу в YDB. После успешного создания таблицы вызывается задание populate_pet_table
, заполняющее таблицу данными с помощью команд UPSERT
, и задание populate_pet_table_via_bulk_upsert
, заполняющее таблицу с помощью команд пакетной вставки данных bulk_upsert
. После выполнения вставки данных выполняется операция чтения с помощью задания get_all_pets
и задание для параметризованного чтения данных get_birth_date
.
Для выполнения запросов к базе данных YDB используется предварительно созданное соединение c YDB типа YDB Connection c именем test_ydb_connection
.
from __future__ import annotations
import datetime
import ydb
from airflow import DAG
from airflow.decorators import task
from airflow.providers.ydb.hooks.ydb import YDBHook
from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator
@task
def populate_pet_table_via_bulk_upsert():
hook = YDBHook(ydb_conn_id="test_ydb_connection")
column_types = (
ydb.BulkUpsertColumns()
.add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
.add_column("name", ydb.PrimitiveType.Utf8)
.add_column("pet_type", ydb.PrimitiveType.Utf8)
.add_column("birth_date", ydb.PrimitiveType.Utf8)
.add_column("owner", ydb.PrimitiveType.Utf8)
)
rows = [
{"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
{"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)
with DAG(
dag_id="ydb_demo_dag",
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
) as dag:
create_pet_table = YDBExecuteQueryOperator(
task_id="create_pet_table",
sql="""
CREATE TABLE pet (
pet_id INT,
name TEXT NOT NULL,
pet_type TEXT NOT NULL,
birth_date TEXT NOT NULL,
owner TEXT NOT NULL,
PRIMARY KEY (pet_id)
);
""",
is_ddl=True, # must be specified for DDL queries
ydb_conn_id="test_ydb_connection"
)
populate_pet_table = YDBExecuteQueryOperator(
task_id="populate_pet_table",
sql="""
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (1, 'Max', 'Dog', '2018-07-05', 'Jane');
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (2, 'Susie', 'Cat', '2019-05-01', 'Phil');
""",
ydb_conn_id="test_ydb_connection"
)
get_all_pets = YDBExecuteQueryOperator(task_id="get_all_pets", sql="SELECT * FROM pet;", ydb_conn_id="test_ydb_connection")
get_birth_date = YDBExecuteQueryOperator(
task_id="get_birth_date",
sql="SELECT * FROM pet WHERE birth_date BETWEEN 'not_var{{params.begin_date}}' AND 'not_var{{params.end_date}}'",
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
ydb_conn_id="test_ydb_connection"
)
(
create_pet_table
>> populate_pet_table
>> populate_pet_table_via_bulk_upsert()
>> get_all_pets
>> get_birth_date
)