Выполнение повторных попыток

YDB является распределенной СУБД с автоматическим масштабированием под нагрузку.
На серверной стороне могут проводиться работы, серверные стойки или целые дата-центры могут быть временно отключены.
В связи с этим допускаются некоторые ошибки при работе с YDB.
В зависимости от типа ошибки следует по разному реагировать на них.
YDB SDK для обеспечения высокой доступности предоставляют встроенные средства выполнения повторных попыток,
в которых учтены типы ошибок и закреплена реакция на них.

Ниже приведены примеры кода использования встроенных в YDB SDK средств выполнения повторных попыток:

В YDB C++ SDK выполнение повторных попыток с корректной обработкой ошибок реализовано в нескольких программных интерфейсах:

Синхронное выполнение повторных попыток

Для выполнения запросов с автоматическими повторными попытками используется метод RetryQuerySync.
Метод принимает лямбда-функцию, которая получает объект сессии и возвращает результат запроса.
YDB C++ SDK автоматически анализирует ошибки и выполняет повторные попытки в соответствии с их типом.

Пример кода, использующего RetryQuerySync:

#include <ydb-cpp-sdk/client/query/client.h>

void ExecuteQueryWithRetry(NYdb::NQuery::TQueryClient client) {
    auto result = client.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus {
        auto query = R"(
            SELECT series_id, title
            FROM series
            WHERE series_id = 1;
        )";

        auto result = session.ExecuteQuery(
            query,
            NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx()
        ).GetValueSync();

        if (!result.IsSuccess()) {
            return result;
        }

        // Обработка результата запроса
        auto resultSet = result.GetResultSet(0);
        NYdb::TResultSetParser parser(resultSet);
        while (parser.TryNextRow()) {
            std::cout << "Series"
                << ", Id: " << parser.ColumnParser("series_id").GetOptionalUint64().value()
                << ", Title: " << parser.ColumnParser("title").GetOptionalUtf8().value()
                << std::endl;
        }

        return result;
    });

    if (!result.IsSuccess()) {
        // Обработка ошибки после всех попыток
        std::cerr << "Query failed: " << result.GetIssues().ToString() << std::endl;
    }
}
Асинхронное выполнение повторных попыток

Для асинхронного выполнения запросов с автоматическими повторными попытками используется метод RetryQuery.
Метод возвращает NThreading::TFuture, что позволяет выполнять операции асинхронно.

Пример кода, использующего RetryQuery:

#include <ydb-cpp-sdk/client/query/client.h>

void ExecuteQueryWithRetryAsync(NYdb::NQuery::TQueryClient client) {
    auto future = client.RetryQuery([](NYdb::NQuery::TSession session) -> NYdb::TAsyncStatus {
        auto query = R"(
            SELECT series_id, title, release_date
            FROM series
            WHERE series_id = 1;
        )";

        return session.ExecuteQuery(
            query,
            NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx()
        ).Apply([](const NYdb::NQuery::TAsyncExecuteQueryResult& asyncResult) -> NYdb::TStatus {
            auto result = asyncResult.GetValue();
            if (!result.IsSuccess()) {
                return result;
            }

            // Обработка результата запроса
            auto resultSet = result.GetResultSet(0);
            NYdb::TResultSetParser parser(resultSet);
            while (parser.TryNextRow()) {
                std::cout << "Series"
                    << ", Id: " << parser.ColumnParser("series_id").GetOptionalUint64().value()
                    << ", Title: " << parser.ColumnParser("title").GetOptionalUtf8().value()
                    << std::endl;
            }

            return result;
        });
    });

    // Ожидание завершения
    auto status = future.GetValueSync();
    if (!status.IsSuccess()) {
        std::cerr << "Query failed: " << status.GetIssues().ToString() << std::endl;
    }
}
Выполнение повторных попыток при работе со стриминговыми запросами

Для выполнения стриминговых запросов с автоматическими повторными попытками используется метод StreamExecuteQuery.
Стриминговые запросы позволяют обрабатывать большие объемы данных, получая результаты частями.

Пример кода, использующего RetryQuerySync со StreamExecuteQuery:

#include <ydb-cpp-sdk/client/query/client.h>

void StreamQueryWithRetry(NYdb::NQuery::TQueryClient client) {
    auto result = client.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus {
        auto query = R"(
            SELECT series_id, title, release_date
            FROM series
            WHERE series_id > 0;
        )";

        auto resultStreamQuery = session.StreamExecuteQuery(
            query,
            NYdb::NQuery::TTxControl::NoTx()
        ).GetValueSync();

        if (!resultStreamQuery.IsSuccess()) {
            return resultStreamQuery;
        }

        // Обработка результатов по частям
        bool eos = false;
        while (!eos) {
            auto streamPart = resultStreamQuery.ReadNext().ExtractValueSync();

            if (!streamPart.IsSuccess()) {
                eos = true;
                if (!streamPart.EOS()) {
                    return streamPart;
                }
                continue;
            }

            if (streamPart.HasResultSet()) {
                auto rs = streamPart.ExtractResultSet();
                NYdb::TResultSetParser parser(rs);
                while (parser.TryNextRow()) {
                    std::cout << "Series"
                        << ", Id: " << parser.ColumnParser("series_id").GetOptionalUint64().value()
                        << ", Title: " << parser.ColumnParser("title").GetOptionalUtf8().value()
                        << std::endl;
                }
            }
        }

        return resultStreamQuery;
    });

    if (!result.IsSuccess()) {
        std::cerr << "Stream query failed: " << result.GetIssues().ToString() << std::endl;
    }
}
Настройка параметров повторных попыток

Пользователь может настраивать поведение механизма повторных попыток с помощью класса TRetryOperationSettings:

  • MaxRetries(uint32_t) - максимальное количество повторных попыток (по умолчанию 10)
  • Idempotent(bool) - признак идемпотентности операции. Идемпотентные операции повторяются для более широкого списка ошибок
  • RetryNotFound(bool) - повторять ли операции, вернувшие статус NOT_FOUND (по умолчанию true)
  • MaxTimeout(TDuration) - максимальное время выполнения всех попыток
  • FastBackoffSettings(TBackoffSettings) - настройки быстрых повторов
  • SlowBackoffSettings(TBackoffSettings) - настройки медленных повторов

Пример использования настроек повторных попыток:

#include <ydb-cpp-sdk/client/query/client.h>
#include <ydb-cpp-sdk/client/retry/retry.h>

void ExecuteWithCustomRetry(NYdb::NQuery::TQueryClient client) {
    auto retrySettings = NYdb::NRetry::TRetryOperationSettings()
        .Idempotent(true)
        .MaxRetries(20)
        .MaxTimeout(TDuration::Seconds(30));

    auto result = client.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus {
        auto query = R"(
            UPSERT INTO series (series_id, title)
            VALUES (10, "New Series");
        )";

        auto result = session.ExecuteQuery(
            query,
            NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx()
        ).GetValueSync();

        if (!result.IsSuccess()) {
            return result;
        }

        // Обработка результата запроса
        std::cout << "Query executed successfully" << std::endl;
        return result;
    }, retrySettings);

    if (!result.IsSuccess()) {
        std::cerr << "Operation failed: " << result.GetIssues().ToString() << std::endl;
    }
}

В YDB Go SDK корректная обработка ошибок закреплена в нескольких программных интерфейсах:

Функция повторов общего назначения

Основная логика обработки ошибок реализуется функцией-помощником retry.Retry
Подробности выполнения повторных запросов максимально скрыты.
Пользователь может влиять на логику работы функции retry.Retry двумя способами:

  • через контекст (можно устанавливать deadline и cancel);
  • через флаг идемпотентности операции retry.WithIdempotent(). По умолчанию операция считается неидемпотентной.

Пользователь передает свою функцию в retry.Retry, которая по своей сигнатуре должна возвращать ошибку.
В случае, если из пользовательской функции вернулся nil, то повторные запросы прекращаются.
В случае, если из пользовательской функции вернулась ошибка, YDB Go SDK пытается идентифицировать эту ошибку и в зависимости от нее выполняет повторные попытки.

Пример кода, использующего функцию retry.Retry:

package main

import (
    "context"
    "time"

    "github.com/ydb-platform/ydb-go-sdk/v3"
    "github.com/ydb-platform/ydb-go-sdk/v3/retry"
)

func main() {
    db, err := ydb.Open(ctx,
        os.Getenv("YDB_CONNECTION_STRING"),
    )
    if err != nil {
        panic(err)
    }
    defer db.Close(ctx)
    var cancel context.CancelFunc
    // fix deadline for retries
    ctx, cancel := context.WithTimeout(ctx, time.Second)
    err = retry.Retry(
        ctx,
        func(ctx context.Context) error {
            whoAmI, err := db.Discovery().WhoAmI(ctx)
            if err != nil {
                return err
            }
            fmt.Println(whoAmI)
            return nil
        },
        retry.WithIdempotent(true),
    )
    if err != nil {
        panic(err)
    }
}
Выполнение повторных попыток при ошибках на объекте сессии YDB

Для повторной обработки ошибок на уровне сессии сервиса таблиц YDB есть функция db.Table().Do(ctx, op), которая предоставляет подготовленную сессию для выполнения запросов.
Функция db.Table().Do(ctx, op) использует пакет retry, а также следит за временем жизни сессий YDB.
Из пользовательской операции op согласно ее сигнатуре требуется возвращать ошибку или nil, чтобы драйвер смог по типу ошибки "понять" что нужно делать: поторять операцию или нет, с задержкой или нет, на этой же сессии или новой.
Пользователь может влиять на логику выполнения повторных запросов через контекст и признак идемпотентности, а YDB Go SDK интерпретирует возвращаемые из op ошибки.

Пример кода, использующего функцию db.Table().Do(ctx, op):

err := db.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
    desc, err = s.DescribeTableOptions(ctx)
    return
}, table.WithIdempotent())
if err != nil {
    return err
}
Выполнение повторных попыток при ошибках на объекте интерактивной транзакции YDB

Для повторной обработки ошибок на уровне интерактивной транзакции сервиса таблиц YDB есть функция db.Table().DoTx(ctx, txOp), которая предоставляет подготовленную транзакцию YDB на сессии для выполнения запросов.
Функция db.Table().DoTx(ctx, txOp) использует пакет retry, а также следит за временем жизни сессий YDB.
Из пользовательской операции txOp согласно ее сигнатуре требуется возвращать ошибку или nil, чтобы драйвер смог по типу ошибки "понять" что нужно делать: поторять операцию или нет, с задержкой или нет, на этой же транзакции или новой.
Пользователь может влиять на логику выполнения повторных запросов через контекст и признак идемпотентности, а YDB Go SDK интерпретирует возвращаемые из op ошибки.

Пример кода, использующего функцию db.Table().DoTx(ctx, op):

err := db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error {
    _, err := tx.Execute(ctx,
        "DECLARE $id AS Int64; INSERT INTO test (id, val) VALUES($id, 'asd')",
        table.NewQueryParameters(table.ValueParam("$id", types.Int64Value(100500))),
    )
    return err
}, table.WithIdempotent())
if err != nil {
    return err
}
Запросы к остальным сервисам YDB

(db.Scripting(), db.Scheme(), db.Coordination(), db.Ratelimiter(), db.Discovery()) также используют внутри себя функцию retry.Retry для выполнения повторных запросов и не требуют использования внешних вспомогательных функций для повторов.

Стандартный пакет database/sql использует внутреннюю логику выполнения повторов на основе тех ошибок, что возвращает конкретная реализация драйвера.
Так, в коде пакета database/sql во многих местах можно встретить политику повторов из трех попыток:

  • 2 попытки выполнить на существующем соединении или новом (если пул соединений database/sql пуст)
  • 1 попытка выполнить на новом соединении.

В большинстве случаев такой политики повторов достаточно, чтобы пережить временную недоступность нод YDB или проблемы с сессией YDB.

YDB Go SDK предоставляет специальные функции для гарантированного выполнения пользовательской операции:

Выполнение повторных попыток при ошибках на объекте соединения *sql.Conn:

Для повторной обработки ошибок на объекте соединения *sql.Conn есть вспомогательная функция retry.Do(ctx, db, op), которая предоставляет подготовленное соединение *sql.Conn для выполнения запросов.
В функцию retry.Do требуется передать контекст, объект базы данных, а также пользовательскую операцию, которую требуется выполнить.
Из клиентского кода можно влиять на логику выполнения повторных запросов через контекст и признак идемпотентности, а YDB Go SDK в свою очередь интерпретирует возвращаемые из op ошибки.

Пользовательская операция op должна возвращать ошибку или nil:

  • в случае, если из пользовательской функции вернулся nil, то повторные запросы прекращаются;
  • в случае, если из пользовательской функции вернулась ошибка, YDB Go SDK пытается идентифицировать эту ошибку и в зависимости от нее предпринимает повторные попытки.

Пример кода, использующего функцию retry.Do:

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    "github.com/ydb-platform/ydb-go-sdk/v3/retry"
)

func main() {
    ...
    err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) (err error) {
        row = cc.QueryRowContext(ctx, `
                PRAGMA TablePathPrefix("/local");
                DECLARE $seriesID AS Uint64;
                DECLARE $seasonID AS Uint64;
                DECLARE $episodeID AS Uint64;
                SELECT views FROM episodes WHERE series_id = $seriesID AND season_id = $seasonID AND episode_id = $episodeID;
            `,
            sql.Named("seriesID", uint64(1)),
            sql.Named("seasonID", uint64(1)),
            sql.Named("episodeID", uint64(1)),
        )
        var views sql.NullFloat64
        if err = row.Scan(&views); err != nil {
            return fmt.Errorf("cannot scan views: %w", err)
        }
        if views.Valid {
            return fmt.Errorf("unexpected valid views: %v", views.Float64)
        }
        log.Printf("views = %v", views)
        return row.Err()
    }, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
    if err != nil {
        log.Printf("retry.Do failed: %v\n", err)
    }
}
Выполнение повторных попыток при ошибках на объекте интерактивной транзакции *sql.Tx:

Для повторной обработки ошибок на объекте интерактивной транзакции *sql.Tx есть вспомогательная функция retry.DoTx(ctx, db, op), которая предоставляет подготовленную транзакцию *sql.Tx для выполнения запросов.
В функцию retry.DoTx требуется передать контекст, объект базы данных, а также пользовательскую операцию, которую требуется выполнить.
В функцию приходит подготовленная транзакция *sql.Tx, на которой следует выполнять запросы к YDB.
Из клиентского кода можно влиять на логику выполнения повторных запросов через контекст и признак идемпотентности операции, а YDB Go SDK в свою очередь интерпретирует возвращаемые из op ошибки.

Пользовательская операция op должна возвращать ошибку или nil:

  • в случае, если из пользовательской функции вернулся nil, то повторные запросы прекращаются;
  • в случае, если из пользовательской функции вернулась ошибка, YDB Go SDK пытается идентифицировать эту ошибку и в зависимости от нее предпринимает повторные попытки.

Функция retry.DoTx использует режим изоляции read-write транзакции sql.LevelDefault по умолчанию, который можно изменить через опцию retry.WithTxOptions.

Пример кода, использующего функцию retry.Do:

import (
    "context"
    "database/sql"
    "fmt"
    "log"

    "github.com/ydb-platform/ydb-go-sdk/v3/retry"
)

func main() {
    ...
    err = retry.DoTx(ctx, db, func(ctx context.Context, tx *sql.Tx) error {
        row := tx.QueryRowContext(ctx,`
                PRAGMA TablePathPrefix("/local");
                DECLARE $seriesID AS Uint64;
                DECLARE $seasonID AS Uint64;
                DECLARE $episodeID AS Uint64;
                SELECT views FROM episodes WHERE series_id = $seriesID AND season_id = $seasonID AND episode_id = $episodeID;
            `,
            sql.Named("seriesID", uint64(1)),
            sql.Named("seasonID", uint64(1)),
            sql.Named("episodeID", uint64(1)),
        )
        var views sql.NullFloat64
        if err = row.Scan(&views); err != nil {
            return fmt.Errorf("cannot select current views: %w", err)
        }
        if !views.Valid {
            return fmt.Errorf("unexpected invalid views: %v", views)
        }
        t.Logf("views = %v", views)
        if views.Float64 != 1 {
            return fmt.Errorf("unexpected views value: %v", views)
        }
        return nil
    }, retry.WithDoTxRetryOptions(retry.WithIdempotent(true)), retry.WithTxOptions(&sql.TxOptions{
        Isolation: sql.LevelSnapshot,
        ReadOnly:  true,
    }))
    if err != nil {
        log.Printf("do tx failed: %v\n", err)
    }
}

В YDB Java SDK механизм повторных запросов реализован в виде класс хелпера SessionRetryContext. Данный класс конструируется с помощью метода SessionRetryContext.create в который требуется передать реализацию интерфейса SessionSupplier - как правило это экземпляр класса TableClient или QueryClient.

Дополнительно пользователь может задавать некоторые другие опции:

  • maxRetries(int maxRetries) - максимальное количество повторов операции, не включает в себя первое выполение. Значение по умолчанию 10
  • retryNotFound(boolean retryNotFound) - опция повтора операций, вернувших статус NOT_FOUND. По умолчанию включено.
  • idempotent(boolean idempotent) - признак идемпотентности операций. Идемпотентные операции будут повторяться для более широкого списка ошибок. По умолчанию отключено.

Для запуска операций с ретраями класс SessionRetryContext предоставляет два метода:

  • CompletableFuture<Status> supplyStatus - выполнение операции, возвращающей статус. В качестве аргумента принимает лямбду Function<Session, CompletableFuture<Status>> fn
  • CompletableFuture<Result<T>> supplyResult - выполнение операции, возвращающей данные. В качестве аргумента принимает лямбду Function<Session, CompletableFuture<Result<T>>> fn

При использовании класса SessionRetryContext нужно учитывать, что повторное исполнение операции будет выполняться в следующих случаях:

  • Лямбда вернула retryable код ошибки

  • В рамках исполнения лямбды была вызвано UnexpectedResultException c retryable кодом ошибки

    Пример кода, использующего SessionRetryContext.supplyStatus:
    private void createTable(TableClient tableClient, String database, String tableName) {
        SessionRetryContext retryCtx = SessionRetryContext.create(tableClient).build();
        TableDescription pets = TableDescription.newBuilder()
                .addNullableColumn("species", PrimitiveType.Text)
                .addNullableColumn("name", PrimitiveType.Text)
                .addNullableColumn("color", PrimitiveType.Text)
                .addNullableColumn("price", PrimitiveType.Float)
                .setPrimaryKeys("species", "name")
                .build();
    
        String tablePath = database + "/" + tableName;
        retryCtx.supplyStatus(session -> session.createTable(tablePath, pets))
                .join().expectSuccess();
    }
    
    Пример кода, использующего SessionRetryContext.supplyResult:
    private void selectData(TableClient tableClient, String tableName) {
        SessionRetryContext retryCtx = SessionRetryContext.create(tableClient).build();
        String selectQuery
                = "DECLARE $species AS Text;"
                + "DECLARE $name AS Text;"
                + "SELECT * FROM " + tableName + " "
                + "WHERE species = $species AND name = $name;";
    
        Params params = Params.of(
                "$species", PrimitiveValue.newText("cat"),
                "$name", PrimitiveValue.newText("Tom")
        );
    
        DataQueryResult data = retryCtx
                .supplyResult(session -> session.executeDataQuery(selectQuery, TxControl.onlineRo(), params))
                .join().getValue();
    
        ResultSetReader rsReader = data.getResultSet(0);
        logger.info("Result of select query:");
        while (rsReader.next()) {
            logger.info("  species: {}, name: {}, color: {}, price: {}",
                    rsReader.getColumn("species").getText(),
                    rsReader.getColumn("name").getText(),
                    rsReader.getColumn("color").getText(),
                    rsReader.getColumn("price").getFloat()
            );
        }
    }
    

Повторные попытки на уровне SessionRetryContext относятся к нативному API (TableClient / QueryClient). При работе через JDBC используйте ретраи на уровне приложения или подключайте нативный транспорт и клиент, как в разделе Инициализация драйвера.

В YDB Python SDK выполнение повторных попыток реализовано в QuerySessionPool с использованием класса RetrySettings для настройки параметров повторов. Класс RetrySettings поддерживает следующие опции:

  • max_retries - максимальное количество повторных попыток (по умолчанию 10)
  • idempotent - признак идемпотентности операции. Идемпотентные операции повторяются для более широкого списка ошибок (по умолчанию False)
  • backoff_ceiling, backoff_slot_duration - параметры алгоритма экспоненциальной задержки
  • fast_backoff_settings, slow_backoff_settings - настройки быстрых и медленных повторов

Для выполнения запросов с повторными попытками QuerySessionPool предоставляет методы retry_operation_sync и execute_with_retries. Метод execute_with_retries предназначен для разовых запросов с неявным режимом транзакции (implicit). Для остальных случаев (явные транзакции, несколько операций в одной транзакции) используйте retry_operation_sync.

Пример кода, использующего execute_with_retries:

import ydb

def execute_query(pool: ydb.QuerySessionPool):
    result_sets = pool.execute_with_retries(
        "SELECT series_id, title FROM series WHERE series_id = 1;",
        retry_settings=ydb.RetrySettings(idempotent=True),
    )
    # ...

Пример кода, использующего retry_operation_sync:

import ydb

def execute_query(pool: ydb.QuerySessionPool):
    def callee(session: ydb.QuerySession):
          with session.transaction().execute(
              "SELECT 1",
              commit_tx=True,
          ) as result_sets:
              pass

    result = pool.retry_operation_sync(
        callee,
        retry_settings=ydb.RetrySettings(max_retries=20, idempotent=True),
    )
    # ...

Пример кода, использующего execute_with_retries:

import ydb

async def execute_query(pool: ydb.aio.QuerySessionPool):
    result_sets = await pool.execute_with_retries(
        "SELECT series_id, title FROM series WHERE series_id = 1;",
        retry_settings=ydb.RetrySettings(idempotent=True),
    )
    # ...

Пример кода, использующего retry_operation_sync:

import ydb

async def execute_query(pool: ydb.aio.QuerySessionPool):
    async def callee(session):
        async with session.transaction(tx_mode=ydb.QuerySerializableReadWrite()) as tx:
            async with await tx.execute("SELECT 1", commit_tx=True) as result_sets:
                pass

    await pool.retry_operation_async(
        callee,
        retry_settings=ydb.RetrySettings(max_retries=20, idempotent=True),
    )
    # ...

При использовании YDB через SQLAlchemy выполнение повторных попыток происходит под капотом и не регулируется снаружи.