Apache Airflow™
Integration of YDB with Apache Airflow™ allows you to automate and manage complex workflows. Apache Airflow™ provides features for scheduling tasks, monitoring their execution, and managing dependencies between them, such as orchestration. Using Airflow to orchestrate tasks such as uploading data to YDB, executing queries, and managing transactions allows you to automate and optimize operational processes. This is especially important for ETL tasks, where large amounts of data require regular extraction, transformation, and loading.
YDB provider package apache-airflow-providers-ydb allows to work with YDB from Apache Airflow™. Apache Airflow™ tasks are Python applications consisting of a set of Apache Airflow™ operators and their dependencies, defining the order of execution.
Setup
Execute the following command on all Apache Airflow™ hosts to install the apache-airflow-providers-ydb
package:
pip install ydb apache-airflow-providers-ydb
Python version 3.8 or higher is required.
Object model
The airflow.providers.ydb
package contains a set of components for interacting with YDB:
- Operator
YDBExecuteQueryOperator
for integrating tasks into the Apache Airflow™ scheduler. - Hook
YDBHook
for direct interaction with YDB.
YDBExecuteQueryOperator
To make requests to YDB, use the Apache Airflow™ operator YDBExecuteQueryOperator
.
Required arguments
task_id
— the name of the Apache Airflow™ task.sql
— the text of the SQL query to be executed in YDB.
Optional arguments
ydb_conn_id
— the connection identifier with theYDB
type, containing the connection parameters for YDB. If omitted, a connection namedydb_default
is used. Theydb_default
connection is preinstalled as part of Apache Airflow™ and does not need to be configured separately.is_ddl
— indicates that SQL DDL is running. If omitted or set toFalse
, then SQL DML is running.params
— a dictionary of query parameters.
Example:
ydb_operator = YDBExecuteQueryOperator(task_id="ydb_operator", sql="SELECT 'Hello, world!'")
In this example, a Apache Airflow™ task is created with the ID ydb_operator
, which executes the query SELECT 'Hello, world!'
.
YDBHook
The Apache Airflow™ class YDBHook
is used to execute low-level commands in YDB.
Optional arguments
ydb_conn_id
— the connection identifier with theYDB
type, containing the connection parameters for YDB. If omitted, a connection namedydb_default
is used. Theydb_default
connection is preinstalled as part of Apache Airflow™ and does not need to be configured separately.is_ddl
— indicates that SQL DDL is running. If omitted or set toFalse
, then SQL DML is running.
YDBHook
supports the following methods:
bulk_upsert
Performs batch data insertion into YDB tables.
Required arguments
table_name
— the name of the YDB table where the data will be inserted.rows
— an array of rows to insert.column_types
— a description of column types.
Example:
hook = YDBHook(ydb_conn_id=...)
column_types = (
ydb.BulkUpsertColumns()
.add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
.add_column("name", ydb.PrimitiveType.Utf8)
.add_column("pet_type", ydb.PrimitiveType.Utf8)
.add_column("birth_date", ydb.PrimitiveType.Utf8)
.add_column("owner", ydb.PrimitiveType.Utf8)
)
rows = [
{"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
{"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)
In this example, a YDBHook
object is created, through which the bulk_upsert
batch data insertion operation is performed.
get_conn
Returns the YDBConnection
object, which implements the DbApiConnection
interface for working with data. The DbApiConnection
class provides a standardized interface for interacting with the database, allowing operations such as establishing connections, executing SQL queries, and managing transactions, regardless of the specific database management system.
Example:
hook = YDBHook(ydb_conn_id=...)
# Execute the SQL query and get the cursor
connection = hook.get_conn()
cursor = connection.cursor()
cursor.execute("SELECT * from pet;")
# Extract the result and column names
result = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Close cursor and connection
cursor.close()
connection.close()
In this example, a YDBHook
object is created, and a YDBConnection
object is requested from the created object. This connection is then used to read data and retrieve a list of columns.
Connection to YDB
To connect to YDB, you must create a new or edit an existing Apache Airflow™ connection with the YDB
type.
Where:
Connection Id
— the Apache Airflow™ connection identifier.Host
— the protocol and cluster address of YDB.Port
— the port of YDB.Database name
— the name of the YDB database.
Specify the details for one of the following authentication methods on the YDB cluster:
Login
andPassword
— specify user credentials for using static credentials.Service account auth JSON
— specify the value of theService Account Key
.Service account auth JSON file path
— specify the path to theService Account Key
file.IAM token
— specify the IAM token.Use VM metadata
— enable this option to use virtual machine metadata.
Matching between YQL and Python types
Below are the rules for converting YQL types to Python results. Types not listed below are not supported.
Scalar types
YQL type | Python type | Example in Python |
---|---|---|
Int8 , Int16 , Int32 , Uint8 , Uint16 , Uint32 , Int64 , Uint64 |
int |
647713 |
Bool |
bool |
True |
Float , float |
float NaN and Inf are represented as None |
7.88731023 None |
Decimal |
Decimal |
45.23410083 |
Utf8 |
str |
"Text of string" |
String |
str |
"Text of string" |
Complex types
YQL type | Python type | Example in Python |
---|---|---|
Json , JsonDocument |
str (the entire node is inserted as a string) |
{"a":[1,2,3]} |
Date |
datetime.date |
2022-02-09 |
Datetime , Timestamp |
datetime.datetime |
2022-02-09 10:13:11 |
Optional types
YQL type | Python type | Example in Python |
---|---|---|
Optional |
Original type or None |
1 |
Containers
YQL type | Python type | Example in Python |
---|---|---|
List<Type> |
list |
[1,2,3,4] |
Dict<KeyType, ValueType> |
dict |
{"key1": "value1", "key2": "value2"} |
Set<KeyType> |
set |
{"key_value1", "key_value2"} |
Tuple<Type1, Type2> |
tuple |
(element1, element2) |
Struct<Name:Utf8, Age:Int32> |
dict |
{"Name": "value1", "Age": value2} |
Special types
YQL type | Python type |
---|---|
Void , Null |
None |
EmptyList |
[] |
EmptyDict |
{} |
Example
To make requests to YDB, the package provides the Apache Airflow™ operator YDBExecuteQueryOperator
and hook YDBHook
.
In the example below, a create_pet_table
task is launched to create a table in YDB. After the table is successfully created, the populate_pet_table
task runs to populate the table with data using UPSERT
commands. Additionally, the populate_pet_table_via_bulk_upsert
task fills the table using bulk_upsert
. After data insertion, a read operation is performed using the get_all_pets
task and the get_birth_date
task for parameterized data reading.
To execute queries in YDB, a pre-created connection of type YDB Connection named test_ydb_connection
is used.
from __future__ import annotations
import datetime
import ydb
from airflow import DAG
from airflow.decorators import task
from airflow.providers.ydb.hooks.ydb import YDBHook
from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator
@task
def populate_pet_table_via_bulk_upsert():
hook = YDBHook(ydb_conn_id="test_ydb_connection")
column_types = (
ydb.BulkUpsertColumns()
.add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
.add_column("name", ydb.PrimitiveType.Utf8)
.add_column("pet_type", ydb.PrimitiveType.Utf8)
.add_column("birth_date", ydb.PrimitiveType.Utf8)
.add_column("owner", ydb.PrimitiveType.Utf8)
)
rows = [
{"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
{"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)
with DAG(
dag_id="ydb_demo_dag",
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
) as dag:
create_pet_table = YDBExecuteQueryOperator(
task_id="create_pet_table",
sql="""
CREATE TABLE pet (
pet_id INT,
name TEXT NOT NULL,
pet_type TEXT NOT NULL,
birth_date TEXT NOT NULL,
owner TEXT NOT NULL,
PRIMARY KEY (pet_id)
);
""",
is_ddl=True, # must be specified for DDL queries
ydb_conn_id="test_ydb_connection"
)
populate_pet_table = YDBExecuteQueryOperator(
task_id="populate_pet_table",
sql="""
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (1, 'Max', 'Dog', '2018-07-05', 'Jane');
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (2, 'Susie', 'Cat', '2019-05-01', 'Phil');
""",
ydb_conn_id="test_ydb_connection"
)
get_all_pets = YDBExecuteQueryOperator(task_id="get_all_pets", sql="SELECT * FROM pet;", ydb_conn_id="test_ydb_connection")
get_birth_date = YDBExecuteQueryOperator(
task_id="get_birth_date",
sql="SELECT * FROM pet WHERE birth_date BETWEEN 'not_var{{params.begin_date}}' AND 'not_var{{params.end_date}}'",
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
ydb_conn_id="test_ydb_connection"
)
(
create_pet_table
>> populate_pet_table
>> populate_pet_table_via_bulk_upsert()
>> get_all_pets
>> get_birth_date
)