Apache Airflow™

Интеграция YDB с Apache Airflow™ позволяет автоматизировать сложные рабочие процессы и управлять ими. Apache Airflow™ предоставляет возможности для планирования задач, мониторинга их выполнения и управления зависимостями между ними — оркестрацией. Использование Airflow для оркестрации задач, таких как загрузка данных в YDB, выполнение запросов и управление транзакциями, позволяет автоматизировать и оптимизировать операционные процессы. Это особенно важно для задач ETL, где данные больших объемов требуют регулярного извлечения, преобразования и загрузки.

Для работы под управлением Apache Airflow™ YDB предоставляет пакет провайдера apache-airflow-providers-ydb. Задания Apache Airflow™ представляют собой приложения на языке Python, состоящие из набора операторов Apache Airflow™ и их зависимостей, определяющих порядок выполнения.

Установка

Для корректной работы пакета apache-airflow-providers-ydb необходимо на всех хостах Apache Airflow™ выполнить следующие команды:

pip install ydb apache-airflow-providers-ydb

Для работы требуется Python версии не ниже, чем 3.8.

Объектная модель

Пакет airflow.providers.ydb содержит набор компонентов для взаимодействия с YDB:

  • Оператор YDBExecuteQueryOperator для интеграции задач в планировщик Apache Airflow™.
  • Хук YDBHook для прямого взаимодействия с YDB.

YDBExecuteQueryOperator

Для выполнения запросов к YDB используется Apache Airflow™ оператор YDBExecuteQueryOperator.

Обязательные аргументы:

  • task_id — название задания Apache Airflow™.
  • sql — текст SQL-запроса, который необходимо выполнить в YDB.

Опциональные аргументы:

  • ydb_conn_id — идентификатор подключения с типом YDB, содержащий параметры соединения с YDB. Если не указан, то используется соединение с именем ydb_default. Соединение ydb_default предустанавливается в составе Apache Airflow™, отдельно его заводить не нужно.
  • is_ddl — признак, что выполняется SQL DDL запрос. Если аргумент не указан, или установлен в False, то будет выполняться SQL DML запрос.
  • params — словарь параметров запроса.

Пример:

ydb_operator = YDBExecuteQueryOperator(task_id="ydb_operator", sql="SELECT 'Hello, world!'")

В данном примере создается задание Apache Airflow™ с идентификатором ydb_operator, которое выполняет запрос SELECT 'Hello, world!'.

YDBHook

Для выполнения низкоуровневых команд в YDB используется Apache Airflow™ класс YDBHook.

Опциональные аргументы:

  • ydb_conn_id — идентификатор подключения с типом YDB, содержащий параметры соединения с YDB. Если не указан, то используется соединение с именем ydb_default. Соединение ydb_default предустанавливается в составе Apache Airflow™, отдельно его заводить не нужно.
  • is_ddl — признак, что выполняется SQL DDL запрос. Если аргумент не указан, или установлен в False, то будет выполняться SQL DML запрос.

YDBHook поддерживает следующие методы:

bulk_upsert

Выполняет пакетную вставку данных в таблицы YDB.

Обязательные аргументы:

  • table_name — название таблицы YDB, куда будет выполняться вставка данных.
  • rows — массив строк для вставки.
  • column_types — описание типов колонок.

Пример:

hook = YDBHook(ydb_conn_id=...)
column_types = (
        ydb.BulkUpsertColumns()
        .add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
        .add_column("name", ydb.PrimitiveType.Utf8)
        .add_column("pet_type", ydb.PrimitiveType.Utf8)
        .add_column("birth_date", ydb.PrimitiveType.Utf8)
        .add_column("owner", ydb.PrimitiveType.Utf8)
    )

rows = [
    {"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
    {"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)

В данном примере создается объект YDBHook, через который выполняется операция пакетной вставки данных bulk_upsert.

get_conn

Возвращает объект YDBConnection, реализующий интерфейс DbApiConnection для работы с данными. Класс DbApiConnection обеспечивает стандартизированный интерфейс для взаимодействия с базой данных, позволяющий выполнять такие операции, как подключение, выполнение SQL-запросов и управление транзакциями, независимо от конкретной системы управления базами данных.

Пример:

hook = YDBHook(ydb_conn_id=...)

# Выполняем SQL-запрос и получаем курсор
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute("SELECT * from pet;")

# Извлекаем результат и имена колонок
result = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]

# Закрываем курсор и соединение
cursor.close()
connection.close()

В данном примере создается объект YDBHook, у созданного объекта запрашивается объект YDBConnection, через который выполняется чтение данных и получение списка колонок.

Подключение к YDB

Для подключения к YDB необходимо создать новое или отредактировать существующее подключение Apache Airflow™ с типом YDB.

Где:

  • Connection Id - название подключения Apache Airflow™.
  • Host - протокол и адрес кластера YDB.
  • Port - порт для подключения к кластеру YDB.
  • Database name - название базы данных YDB.

Укажите реквизиты для одного из следующих способов аутентификации на кластере YDB:

Соответствие YQL и Python-типов

Ниже приведены правила преобразования YQL-типов в Python-результаты. Типы, не указанные в списке ниже, не поддерживаются.

Скалярные типы

YQL-тип Python-тип Пример в Python
Int8, Int16, Int32, Uint8, Uint16, Uint32, Int64, Uint64 int 647713
Bool bool True
Float, float float
NaN и Inf представляются в виде None
7.88731023
None
Decimal Decimal 45.23410083
Utf8 str Текст строки
String str Текст строки

Сложные типы

YQL-тип Python-тип Пример в Python
Json, JsonDocument str (весь узел вставляется как строка) {"a":[1,2,3]}
Date datetime.date 2022-02-09
Datetime, Timestamp datetime.datetime 2022-02-09 10:13:11

Опциональные типы

YQL-тип Python-тип Пример в Python
Optional Оригинальный тип или None 1

Контейнеры

YQL-тип Python-тип Пример в Python
List<Type> list [1,2,3,4]
Dict<KeyType, ValueType> dict {key1: "value1", key2: "value2"}
Set<KeyType> set set(key_value1, key_value2)
Tuple<Type1, Type2> tuple (element1, element2)
Struct<Name:Utf8,Age:Int32> dict { "Name": "value1", "Age": value2 }

Специальные типы

YQL-тип Python-тип
Void, Null None
EmptyList []
EmptyDict {}

Пример

Для выполнения запросов к YDB в составе пакета содержится оператор Apache Airflow™ YDBExecuteQueryOperator и хук YDBHook.

В примере ниже создается задание create_pet_table, создающее таблицу в YDB. После успешного создания таблицы вызывается задание populate_pet_table, заполняющее таблицу данными с помощью команд UPSERT, и задание populate_pet_table_via_bulk_upsert, заполняющее таблицу с помощью команд пакетной вставки данных bulk_upsert. После выполнения вставки данных выполняется операция чтения с помощью задания get_all_pets и задание для параметризованного чтения данных get_birth_date.

Для выполнения запросов к базе данных YDB используется предварительно созданное соединение c YDB типа YDB Connection c именем test_ydb_connection.

from __future__ import annotations

import datetime

import ydb
from airflow import DAG
from airflow.decorators import task
from airflow.providers.ydb.hooks.ydb import YDBHook
from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator

@task
def populate_pet_table_via_bulk_upsert():
    hook = YDBHook(ydb_conn_id="test_ydb_connection")
    column_types = (
        ydb.BulkUpsertColumns()
        .add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
        .add_column("name", ydb.PrimitiveType.Utf8)
        .add_column("pet_type", ydb.PrimitiveType.Utf8)
        .add_column("birth_date", ydb.PrimitiveType.Utf8)
        .add_column("owner", ydb.PrimitiveType.Utf8)
    )

    rows = [
        {"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
        {"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
    ]
    hook.bulk_upsert("pet", rows=rows, column_types=column_types)


with DAG(
    dag_id="ydb_demo_dag",
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
) as dag:
    create_pet_table = YDBExecuteQueryOperator(
        task_id="create_pet_table",
        sql="""
            CREATE TABLE pet (
            pet_id INT,
            name TEXT NOT NULL,
            pet_type TEXT NOT NULL,
            birth_date TEXT NOT NULL,
            owner TEXT NOT NULL,
            PRIMARY KEY (pet_id)
            );
          """,
        is_ddl=True,  # must be specified for DDL queries
        ydb_conn_id="test_ydb_connection"
    )

    populate_pet_table = YDBExecuteQueryOperator(
        task_id="populate_pet_table",
        sql="""
              UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
              VALUES (1, 'Max', 'Dog', '2018-07-05', 'Jane');

              UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
              VALUES (2, 'Susie', 'Cat', '2019-05-01', 'Phil');
            """,
        ydb_conn_id="test_ydb_connection"
    )

    get_all_pets = YDBExecuteQueryOperator(task_id="get_all_pets", sql="SELECT * FROM pet;", ydb_conn_id="test_ydb_connection")

    get_birth_date = YDBExecuteQueryOperator(
        task_id="get_birth_date",
        sql="SELECT * FROM pet WHERE birth_date BETWEEN 'not_var{{params.begin_date}}' AND 'not_var{{params.end_date}}'",
        params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
        ydb_conn_id="test_ydb_connection"
    )

    (
        create_pet_table
        >> populate_pet_table
        >> populate_pet_table_via_bulk_upsert()
        >> get_all_pets
        >> get_birth_date
    )
Предыдущая