Apache Airflow™

Integration of YDB with Apache Airflow™ allows you to automate and manage complex workflows. Apache Airflow™ provides features for scheduling tasks, monitoring their execution, and managing dependencies between them, such as orchestration. Using Airflow to orchestrate tasks such as uploading data to YDB, executing queries, and managing transactions allows you to automate and optimize operational processes. This is especially important for ETL tasks, where large amounts of data require regular extraction, transformation, and loading.

YDB provider package apache-airflow-providers-ydb allows to work with YDB from Apache Airflow™. Apache Airflow™ tasks are Python applications consisting of a set of Apache Airflow™ operators and their dependencies, defining the order of execution.

Setup

Execute the following command on all Apache Airflow™ hosts to install the apache-airflow-providers-ydb package:

pip install ydb apache-airflow-providers-ydb

Python version 3.8 or higher is required.

Object model

The airflow.providers.ydb package contains a set of components for interacting with YDB:

YDBExecuteQueryOperator

To make requests to YDB, use the Apache Airflow™ operator YDBExecuteQueryOperator.

Required arguments

  • task_id — the name of the Apache Airflow™ task.
  • sql — the text of the SQL query to be executed in YDB.

Optional arguments

  • ydb_conn_id — the connection identifier with the YDB type, containing the connection parameters for YDB. If omitted, a connection named ydb_default is used. The ydb_default connection is preinstalled as part of Apache Airflow™ and does not need to be configured separately.
  • is_ddl — indicates that SQL DDL is running. If omitted or set to False, then SQL DML is running.
  • params — a dictionary of query parameters.

Example:

ydb_operator = YDBExecuteQueryOperator(task_id="ydb_operator", sql="SELECT 'Hello, world!'")

In this example, a Apache Airflow™ task is created with the ID ydb_operator, which executes the query SELECT 'Hello, world!'.

YDBHook

The Apache Airflow™ class YDBHook is used to execute low-level commands in YDB.

Optional arguments

  • ydb_conn_id — the connection identifier with the YDB type, containing the connection parameters for YDB. If omitted, a connection named ydb_default is used. The ydb_default connection is preinstalled as part of Apache Airflow™ and does not need to be configured separately.
  • is_ddl — indicates that SQL DDL is running. If omitted or set to False, then SQL DML is running.

YDBHook supports the following methods:

bulk_upsert

Performs batch data insertion into YDB tables.

Required arguments

  • table_name — the name of the YDB table where the data will be inserted.
  • rows — an array of rows to insert.
  • column_types — a description of column types.

Example:

hook = YDBHook(ydb_conn_id=...)
column_types = (
        ydb.BulkUpsertColumns()
        .add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
        .add_column("name", ydb.PrimitiveType.Utf8)
        .add_column("pet_type", ydb.PrimitiveType.Utf8)
        .add_column("birth_date", ydb.PrimitiveType.Utf8)
        .add_column("owner", ydb.PrimitiveType.Utf8)
    )

rows = [
    {"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
    {"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)

In this example, a YDBHook object is created, through which the bulk_upsert batch data insertion operation is performed.

get_conn

Returns the YDBConnection object, which implements the DbApiConnection interface for working with data. The DbApiConnection class provides a standardized interface for interacting with the database, allowing operations such as establishing connections, executing SQL queries, and managing transactions, regardless of the specific database management system.

Example:

hook = YDBHook(ydb_conn_id=...)

# Execute the SQL query and get the cursor
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute("SELECT * from pet;")

# Extract the result and column names
result = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]

# Close cursor and connection
cursor.close()
connection.close()

In this example, a YDBHook object is created, and a YDBConnection object is requested from the created object. This connection is then used to read data and retrieve a list of columns.

Connection to YDB

To connect to YDB, you must create a new or edit an existing Apache Airflow™ connection with the YDB type.

Where:

  • Connection Id — the Apache Airflow™ connection identifier.
  • Host — the protocol and cluster address of YDB.
  • Port — the port of YDB.
  • Database name — the name of the YDB database.

Specify the details for one of the following authentication methods on the YDB cluster:

  • Login and Password — specify user credentials for using static credentials.
  • Service account auth JSON — specify the value of the Service Account Key.
  • Service account auth JSON file path — specify the path to the Service Account Key file.
  • IAM token — specify the IAM token.
  • Use VM metadata — enable this option to use virtual machine metadata.

Matching between YQL and Python types

Below are the rules for converting YQL types to Python results. Types not listed below are not supported.

Scalar types

YQL type Python type Example in Python
Int8, Int16, Int32, Uint8, Uint16, Uint32, Int64, Uint64 int 647713
Bool bool True
Float, float float
NaN and Inf are represented as None
7.88731023
None
Decimal Decimal 45.23410083
Utf8 str "Text of string"
String str "Text of string"

Complex types

YQL type Python type Example in Python
Json, JsonDocument str (the entire node is inserted as a string) {"a":[1,2,3]}
Date datetime.date 2022-02-09
Datetime, Timestamp datetime.datetime 2022-02-09 10:13:11

Optional types

YQL type Python type Example in Python
Optional Original type or None 1

Containers

YQL type Python type Example in Python
List<Type> list [1,2,3,4]
Dict<KeyType, ValueType> dict {"key1": "value1", "key2": "value2"}
Set<KeyType> set {"key_value1", "key_value2"}
Tuple<Type1, Type2> tuple (element1, element2)
Struct<Name:Utf8, Age:Int32> dict {"Name": "value1", "Age": value2}

Special types

YQL type Python type
Void, Null None
EmptyList []
EmptyDict {}

Example

To make requests to YDB, the package provides the Apache Airflow™ operator YDBExecuteQueryOperator and hook YDBHook.

In the example below, a create_pet_table task is launched to create a table in YDB. After the table is successfully created, the populate_pet_table task runs to populate the table with data using UPSERT commands. Additionally, the populate_pet_table_via_bulk_upsert task fills the table using bulk_upsert. After data insertion, a read operation is performed using the get_all_pets task and the get_birth_date task for parameterized data reading.

To execute queries in YDB, a pre-created connection of type YDB Connection named test_ydb_connection is used.

from __future__ import annotations

import datetime

import ydb
from airflow import DAG
from airflow.decorators import task
from airflow.providers.ydb.hooks.ydb import YDBHook
from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator

@task
def populate_pet_table_via_bulk_upsert():
    hook = YDBHook(ydb_conn_id="test_ydb_connection")
    column_types = (
        ydb.BulkUpsertColumns()
        .add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
        .add_column("name", ydb.PrimitiveType.Utf8)
        .add_column("pet_type", ydb.PrimitiveType.Utf8)
        .add_column("birth_date", ydb.PrimitiveType.Utf8)
        .add_column("owner", ydb.PrimitiveType.Utf8)
    )

    rows = [
        {"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
        {"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
    ]
    hook.bulk_upsert("pet", rows=rows, column_types=column_types)


with DAG(
    dag_id="ydb_demo_dag",
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
) as dag:
    create_pet_table = YDBExecuteQueryOperator(
        task_id="create_pet_table",
        sql="""
            CREATE TABLE pet (
            pet_id INT,
            name TEXT NOT NULL,
            pet_type TEXT NOT NULL,
            birth_date TEXT NOT NULL,
            owner TEXT NOT NULL,
            PRIMARY KEY (pet_id)
            );
          """,
        is_ddl=True,  # must be specified for DDL queries
        ydb_conn_id="test_ydb_connection"
    )

    populate_pet_table = YDBExecuteQueryOperator(
        task_id="populate_pet_table",
        sql="""
              UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
              VALUES (1, 'Max', 'Dog', '2018-07-05', 'Jane');

              UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
              VALUES (2, 'Susie', 'Cat', '2019-05-01', 'Phil');
            """,
        ydb_conn_id="test_ydb_connection"
    )

    get_all_pets = YDBExecuteQueryOperator(task_id="get_all_pets", sql="SELECT * FROM pet;", ydb_conn_id="test_ydb_connection")

    get_birth_date = YDBExecuteQueryOperator(
        task_id="get_birth_date",
        sql="SELECT * FROM pet WHERE birth_date BETWEEN 'not_var{{params.begin_date}}' AND 'not_var{{params.end_date}}'",
        params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
        ydb_conn_id="test_ydb_connection"
    )

    (
        create_pet_table
        >> populate_pet_table
        >> populate_pet_table_via_bulk_upsert()
        >> get_all_pets
        >> get_birth_date
    )