Retrying
YDB is a distributed database management system with automatic load scaling.
Routine maintenance can be carried out on the server side, with server racks or entire data centers temporarily shut down.
This may result in errors when working with YDB.
Depending on the error type, you should respond differently.
YDB SDKs provide built-in tools for retries to ensure high availability,
with error types accounted for and defined handling for them.
Below are code examples showing the YDB SDK built-in tools for retries:
In the YDB C++ SDK, retries with correct error handling is implemented by several programming interfaces:
Synchronous retry attempts
The RetryQuerySync method is used to execute queries with automatic retries.
The method accepts a lambda function that receives a session object and returns the query result.
YDB C++ SDK automatically analyzes errors and performs retries according to their type.
Example code using RetryQuerySync:
#include <ydb-cpp-sdk/client/query/client.h>
void ExecuteQueryWithRetry(NYdb::NQuery::TQueryClient client) {
auto result = client.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus {
auto query = R"(
SELECT series_id, title
FROM series
WHERE series_id = 1;
)";
auto result = session.ExecuteQuery(
query,
NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx()
).GetValueSync();
if (!result.IsSuccess()) {
return result;
}
// Process query results
auto resultSet = result.GetResultSet(0);
NYdb::TResultSetParser parser(resultSet);
while (parser.TryNextRow()) {
std::cout << "Series"
<< ", Id: " << parser.ColumnParser("series_id").GetOptionalUint64().value()
<< ", Title: " << parser.ColumnParser("title").GetOptionalUtf8().value()
<< std::endl;
}
return result;
});
if (!result.IsSuccess()) {
// Handle error after all retry attempts
std::cerr << "Query failed: " << result.GetIssues().ToString() << std::endl;
}
}
Asynchronous retry attempts
The RetryQuery method is used for asynchronous query execution with automatic retries.
The method returns NThreading::TFuture, which allows for asynchronous operations.
Example code using RetryQuery:
#include <ydb-cpp-sdk/client/query/client.h>
void ExecuteQueryWithRetryAsync(NYdb::NQuery::TQueryClient client) {
auto future = client.RetryQuery([](NYdb::NQuery::TSession session) -> NYdb::TAsyncStatus {
auto query = R"(
SELECT series_id, title, release_date
FROM series
WHERE series_id = 1;
)";
return session.ExecuteQuery(
query,
NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx()
).Apply([](const NYdb::NQuery::TAsyncExecuteQueryResult& asyncResult) -> NYdb::TStatus {
auto result = asyncResult.GetValue();
if (!result.IsSuccess()) {
return result;
}
// Process query results
auto resultSet = result.GetResultSet(0);
NYdb::TResultSetParser parser(resultSet);
while (parser.TryNextRow()) {
std::cout << "Series"
<< ", Id: " << parser.ColumnParser("series_id").GetOptionalUint64().value()
<< ", Title: " << parser.ColumnParser("title").GetOptionalUtf8().value()
<< std::endl;
}
return result;
});
});
// Wait for completion
auto status = future.GetValueSync();
if (!status.IsSuccess()) {
std::cerr << "Query failed: " << status.GetIssues().ToString() << std::endl;
}
}
Retry attempts for streaming queries
For executing streaming queries with automatic retries, use StreamExecuteQuery.
Streaming queries allow processing large volumes of data by receiving results in parts.
Example code using RetryQuerySync with StreamExecuteQuery:
#include <ydb-cpp-sdk/client/query/client.h>
void StreamQueryWithRetry(NYdb::NQuery::TQueryClient client) {
auto result = client.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus {
auto query = R"(
SELECT series_id, title, release_date
FROM series
WHERE series_id > 0;
)";
auto resultStreamQuery = session.StreamExecuteQuery(
query,
NYdb::NQuery::TTxControl::NoTx()
).GetValueSync();
if (!resultStreamQuery.IsSuccess()) {
return resultStreamQuery;
}
// Process results in parts
bool eos = false;
while (!eos) {
auto streamPart = resultStreamQuery.ReadNext().ExtractValueSync();
if (!streamPart.IsSuccess()) {
eos = true;
if (!streamPart.EOS()) {
return streamPart;
}
continue;
}
if (streamPart.HasResultSet()) {
auto rs = streamPart.ExtractResultSet();
NYdb::TResultSetParser parser(rs);
while (parser.TryNextRow()) {
std::cout << "Series"
<< ", Id: " << parser.ColumnParser("series_id").GetOptionalUint64().value()
<< ", Title: " << parser.ColumnParser("title").GetOptionalUtf8().value()
<< std::endl;
}
}
}
return resultStreamQuery;
});
if (!result.IsSuccess()) {
std::cerr << "Stream query failed: " << result.GetIssues().ToString() << std::endl;
}
}
Configuring retry parameters
Users can configure the behavior of the retry mechanism using the TRetryOperationSettings class:
MaxRetries(uint32_t)- maximum number of retry attempts (default is 10)Idempotent(bool)- indicates whether the operation is idempotent. Idempotent operations are retried for a broader range of errorsRetryNotFound(bool)- whether to retry operations that returned aNOT_FOUNDstatus (default is true)MaxTimeout(TDuration)- maximum time for all retry attemptsFastBackoffSettings(TBackoffSettings)- settings for fast retriesSlowBackoffSettings(TBackoffSettings)- settings for slow retries
Example of using retry settings:
#include <ydb-cpp-sdk/client/query/client.h>
#include <ydb-cpp-sdk/client/retry/retry.h>
void ExecuteWithCustomRetry(NYdb::NQuery::TQueryClient client) {
auto retrySettings = NYdb::NRetry::TRetryOperationSettings()
.Idempotent(true)
.MaxRetries(20)
.MaxTimeout(TDuration::Seconds(30));
auto result = client.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus {
auto query = R"(
UPSERT INTO series (series_id, title)
VALUES (10, "New Series");
)";
auto result = session.ExecuteQuery(
query,
NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx()
).GetValueSync();
if (!result.IsSuccess()) {
return result;
}
// Process query result
std::cout << "Query executed successfully" << std::endl;
return result;
}, retrySettings);
if (!result.IsSuccess()) {
std::cerr << "Operation failed: " << result.GetIssues().ToString() << std::endl;
}
}
In the YDB Go SDK, correct error handling is implemented by several programming interfaces:
General-purpose repeat function
The basic logic of error handling is implemented by the helper retry.Retry function.
The details of repeat query execution are mostly hidden.
The user can affect the logic of the retry.Retry function in two ways:
- Via the context (where you can set the deadline and cancel)
- Via the operation's idempotency flag
retry.WithIdempotent(). By default, the operation is considered non-idempotent.
The user passes a custom function to retry.Retry that returns an error by its signature.
If the custom function returns nil, then repeat queries stop.
If the custom function returns an error, the YDB Go SDK tries to identify this error and executes retries depending on it.
Example of the code that uses the retry.Retry function:
package main
import (
"context"
"time"
"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
)
func main() {
db, err := ydb.Open(ctx,
os.Getenv("YDB_CONNECTION_STRING"),
)
if err != nil {
panic(err)
}
defer db.Close(ctx)
var cancel context.CancelFunc
// fix deadline for retries
ctx, cancel := context.WithTimeout(ctx, time.Second)
err = retry.Retry(
ctx,
func(ctx context.Context) error {
whoAmI, err := db.Discovery().WhoAmI(ctx)
if err != nil {
return err
}
fmt.Println(whoAmI)
return nil
},
retry.WithIdempotent(true),
)
if err != nil {
panic(err)
}
}
Repeat attempts in case of failed YDB session objects
For repeat error handling at the level of a YDB table service session, you can use the db.Table().Do(ctx, op) function, which provides a prepared session for query execution.
db.Table().Do(ctx, op) uses the retry package and tracks the lifetime of the YDB sessions.
Based on its signature, the user's operation op should return an error or nil so that the driver can "decide" what to do based on the error type: repeat the operation or not, with delay or without, and in this session or a new one.
The user can affect the logic of repeat queries using the context and the idempotence flag, while the YDB Go SDK interprets errors returned by op.
Example of the code that uses the db.Table().Do(ctx, op) function:
err := db.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) {
desc, err = s.DescribeTableOptions(ctx)
return
}, table.WithIdempotent())
if err != nil {
return err
}
Repeat attempts in case of failed YDB interactive transaction objects
For repeat error handling at the level of a YDB table service interactive transaction, you can use the db.Table().DoTx(ctx, txOp) function, which provides a YDB prepared session transaction for query execution.
db.Table().DoTx(ctx, txOp) uses the retry package and tracks the lifetime of the YDB sessions.
Based on its signature, the user's operation txOp should return an error or nil so that the driver can "decide" what to do based on the error type: repeat the operation or not, with delay or without, and in this transaction or a new one.
The user can affect the logic of repeat queries using the context and the idempotence flag, while the YDB Go SDK interprets errors returned by op.
Example of the code that uses the db.Table().DoTx(ctx, op) function:
err := db.Table().DoTx(ctx, func(ctx context.Context, tx table.TransactionActor) error {
_, err := tx.Execute(ctx,
"DECLARE $id AS Int64; INSERT INTO test (id, val) VALUES($id, 'asd')",
table.NewQueryParameters(table.ValueParam("$id", types.Int64Value(100500))),
)
return err
}, table.WithIdempotent())
if err != nil {
return err
}
Queries to other YDB services
(db.Scripting(), db.Scheme(), db.Coordination(), db.Ratelimiter(), db.Discovery()) also use the retry.Retry function inside to execute repeat queries and don't require external auxiliary functions for repeats.
The standard database/sql package uses the internal logic of repeats based on the errors a specific driver implementation returns.
For example, the database/sql code frequently shows the three-attempt repeats policy:
- Two attempts at a present connection or new one (if the
database/sqlconnection pool is empty). - One attempt at a new connection.
This repeat policy is mostly enough to survive temporary unavailability of YDB nodes or issues with a YDB session.
The YDB Go SDK provides special functions to ensure execution of a user's operation:
Repeat attempts in case of failed *sql.Conn connection objects:
For repeat error handling at *sql.Conn connection objects, you can use the auxiliary retry.Do(ctx, db, op) function, which provides a prepared *sql.Conn session for query execution.
You need to pass the context, database object, and the user's operation for execution to the retry.Do function.
The user's code can affect the logic of repeat queries using the context and the idempotence flag, while the YDB Go SDK, in turn, interprets errors returned by op.
The user's op operation must return an error or nil:
- If the custom function returns
nil, then repeat queries stop. - If the custom function returns an error, the YDB Go SDK tries to identify this error and performs retries depending on it.
Example of the code that uses the retry.Do function:
import (
"context"
"database/sql"
"fmt"
"log"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
)
func main() {
...
err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) (err error) {
row = cc.QueryRowContext(ctx, `
PRAGMA TablePathPrefix("/local");
DECLARE $seriesID AS Uint64;
DECLARE $seasonID AS Uint64;
DECLARE $episodeID AS Uint64;
SELECT views FROM episodes WHERE series_id = $seriesID AND season_id = $seasonID AND episode_id = $episodeID;
`,
sql.Named("seriesID", uint64(1)),
sql.Named("seasonID", uint64(1)),
sql.Named("episodeID", uint64(1)),
)
var views sql.NullFloat64
if err = row.Scan(&views); err != nil {
return fmt.Errorf("cannot scan views: %w", err)
}
if views.Valid {
return fmt.Errorf("unexpected valid views: %v", views.Float64)
}
log.Printf("views = %v", views)
return row.Err()
}, retry.WithDoRetryOptions(retry.WithIdempotent(true)))
if err != nil {
log.Printf("retry.Do failed: %v\n", err)
}
}
Repeat attempts in case of failed *sql.Tx interactive transaction objects:
For repeat error handling at *sql.Tx interactive transaction objects, you can use the auxiliary retry.DoTx(ctx, db, op) function, which provides a prepared *sql.Tx transaction for query execution.
You need to pass the context, database object, and the user's operation for execution to the retry.DoTx function.
The function is passed a prepared *sql.Tx transaction, where queries to YDB should be executed.
The user's code can affect the logic of repeat queries using the context and the operation idempotence flag, while the YDB Go SDK, in turn, interprets errors returned by op.
The user's op operation must return an error or nil:
- If the custom function returns
nil, then repeat queries stop. - If the custom function returns an error, the YDB Go SDK tries to identify this error and performs retries depending on it.
By default, retry.DoTx uses the read-write isolation mode of the sql.LevelDefault transaction and you can change it using the retry.WithTxOptions parameter.
Example of the code that uses the retry.Do function:
import (
"context"
"database/sql"
"fmt"
"log"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
)
func main() {
...
err = retry.DoTx(ctx, db, func(ctx context.Context, tx *sql.Tx) error {
row := tx.QueryRowContext(ctx,`
PRAGMA TablePathPrefix("/local");
DECLARE $seriesID AS Uint64;
DECLARE $seasonID AS Uint64;
DECLARE $episodeID AS Uint64;
SELECT views FROM episodes WHERE series_id = $seriesID AND season_id = $seasonID AND episode_id = $episodeID;
`,
sql.Named("seriesID", uint64(1)),
sql.Named("seasonID", uint64(1)),
sql.Named("episodeID", uint64(1)),
)
var views sql.NullFloat64
if err = row.Scan(&views); err != nil {
return fmt.Errorf("cannot select current views: %w", err)
}
if !views.Valid {
return fmt.Errorf("unexpected invalid views: %v", views)
}
t.Logf("views = %v", views)
if views.Float64 != 1 {
return fmt.Errorf("unexpected views value: %v", views)
}
return nil
}, retry.WithDoTxRetryOptions(retry.WithIdempotent(true)), retry.WithTxOptions(&sql.TxOptions{
Isolation: sql.LevelSnapshot,
ReadOnly: true,
}))
if err != nil {
log.Printf("do tx failed: %v\n", err)
}
}
In the YDB Java SDK, repeat queries are implemented by the SessionRetryContext helper class. This class is constructed with the SessionRetryContext.create method to which you pass the SessionSupplier interface implementation (usually an instance of the TableClient class or the QueryClient class).
Additionally, the user can specify some other options:
maxRetries(int maxRetries): The maximum number of operation retries, not counting the first execution. Default value:10retryNotFound(boolean retryNotFound): The option to retry operations that returned theNOT_FOUNDstatus. Enabled by default.idempotent(boolean idempotent): Indicates idempotence of operations. Idempotent operations will be retried for a broader range of errors. Disabled by default.
The SessionRetryContext class provides two methods to run operations with retries.
CompletableFuture<Status> supplyStatus: Executing the operation that returns the status. As an argument, it accepts the lambdaFunction<Session, CompletableFuture<Status>> fnCompletableFuture<Result<T>> supplyResult: Executing the operation that returns data. As an argument, it accepts the lambdaFunction<Session, CompletableFuture<Result<T>>> fn
When using the SessionRetryContext class, make sure that the operation will be retried in the following cases:
- The lambda function returned a retryable error code
- The lambda function invoked an
UnexpectedResultExceptionwith a retryable error code
Sample code using SessionRetryContext.supplyStatus:
private void createTable(TableClient tableClient, String database, String tableName) {
SessionRetryContext retryCtx = SessionRetryContext.create(tableClient).build();
TableDescription pets = TableDescription.newBuilder()
.addNullableColumn("species", PrimitiveType.Text)
.addNullableColumn("name", PrimitiveType.Text)
.addNullableColumn("color", PrimitiveType.Text)
.addNullableColumn("price", PrimitiveType.Float)
.setPrimaryKeys("species", "name")
.build();
String tablePath = database + "/" + tableName;
retryCtx.supplyStatus(session -> session.createTable(tablePath, pets))
.join().expectSuccess();
}
Sample code using SessionRetryContext.supplyResult:
private void selectData(TableClient tableClient, String tableName) {
SessionRetryContext retryCtx = SessionRetryContext.create(tableClient).build();
String selectQuery
= "DECLARE $species AS Text;"
+ "DECLARE $name AS Text;"
+ "SELECT * FROM " + tableName + " "
+ "WHERE species = $species AND name = $name;";
Params params = Params.of(
"$species", PrimitiveValue.newText("cat"),
"$name", PrimitiveValue.newText("Tom")
);
DataQueryResult data = retryCtx
.supplyResult(session -> session.executeDataQuery(selectQuery, TxControl.onlineRo(), params))
.join().getValue();
ResultSetReader rsReader = data.getResultSet(0);
logger.info("Result of select query:");
while (rsReader.next()) {
logger.info(" species: {}, name: {}, color: {}, price: {}",
rsReader.getColumn("species").getText(),
rsReader.getColumn("name").getText(),
rsReader.getColumn("color").getText(),
rsReader.getColumn("price").getFloat()
);
}
}
Retries at the SessionRetryContext level apply to the native API (TableClient / QueryClient). When using JDBC, implement retries at the application level or use native transport and clients as in Driver initialization.
In the YDB Python SDK, retries are implemented in QuerySessionPool using the RetrySettings class. RetrySettings supports:
max_retries— maximum number of retries (default 10)idempotent— whether the operation is idempotent; idempotent operations are retried for a broader set of errors (default False)backoff_ceiling,backoff_slot_duration— parameters for exponential backofffast_backoff_settings,slow_backoff_settings— fast and slow retry tuning
For queries with retries, QuerySessionPool provides retry_operation_sync and execute_with_retries. Use execute_with_retries for one-off queries in implicit transaction mode. For explicit transactions or multiple operations in one transaction, use retry_operation_sync.
Example using execute_with_retries:
import ydb
def execute_query(pool: ydb.QuerySessionPool):
result_sets = pool.execute_with_retries(
"SELECT series_id, title FROM series WHERE series_id = 1;",
retry_settings=ydb.RetrySettings(idempotent=True),
)
# ...
Example using retry_operation_sync:
import ydb
def execute_query(pool: ydb.QuerySessionPool):
def callee(session: ydb.QuerySession):
with session.transaction().execute(
"SELECT 1",
commit_tx=True,
) as result_sets:
pass
result = pool.retry_operation_sync(
callee,
retry_settings=ydb.RetrySettings(max_retries=20, idempotent=True),
)
# ...
Example using execute_with_retries:
import ydb
async def execute_query(pool: ydb.aio.QuerySessionPool):
result_sets = await pool.execute_with_retries(
"SELECT series_id, title FROM series WHERE series_id = 1;",
retry_settings=ydb.RetrySettings(idempotent=True),
)
# ...
Example using retry_operation_sync:
import ydb
async def execute_query(pool: ydb.aio.QuerySessionPool):
async def callee(session):
async with session.transaction(tx_mode=ydb.QuerySerializableReadWrite()) as tx:
async with await tx.execute("SELECT 1", commit_tx=True) as result_sets:
pass
await pool.retry_operation_async(
callee,
retry_settings=ydb.RetrySettings(max_retries=20, idempotent=True),
)
# ...
When using YDB through SQLAlchemy, retries happen internally and are not configured from the outside.
Retries and reconnections are built into the SDK; you do not need to configure anything separately.
The retrier is available in the separate @ydbjs/retry package.
import { retry } from '@ydbjs/retry'
let attempts = 0
const result = retry({ retry: isError, budget: 3 }, async () => {
if (attempts >= 2) {
return 'success'
}
attempts++
throw new Error('test error')
})
In the YDB Python SDK, retries are implemented in QuerySessionPool using the RetrySettings class. RetrySettings supports:
max_retries— maximum number of retries (default 10)idempotent— whether the operation is idempotent; idempotent operations are retried for a broader set of errors (default False)backoff_ceiling,backoff_slot_duration— parameters for exponential backofffast_backoff_settings,slow_backoff_settings— fast and slow retry tuning
For queries with retries, QuerySessionPool provides retry_operation_sync and execute_with_retries. Use execute_with_retries for one-off queries in implicit transaction mode. For explicit transactions or multiple operations in one transaction, use retry_operation_sync.
Example using execute_with_retries:
import ydb
def execute_query(pool: ydb.QuerySessionPool):
result_sets = pool.execute_with_retries(
"SELECT series_id, title FROM series WHERE series_id = 1;",
retry_settings=ydb.RetrySettings(idempotent=True),
)
# ...
Example using retry_operation_sync:
import ydb
def execute_query(pool: ydb.QuerySessionPool):
def callee(session: ydb.QuerySession):
with session.transaction().execute(
"SELECT 1",
commit_tx=True,
) as result_sets:
pass
result = pool.retry_operation_sync(
callee,
retry_settings=ydb.RetrySettings(max_retries=20, idempotent=True),
)
# ...
Example using execute_with_retries:
import ydb
async def execute_query(pool: ydb.aio.QuerySessionPool):
result_sets = await pool.execute_with_retries(
"SELECT series_id, title FROM series WHERE series_id = 1;",
retry_settings=ydb.RetrySettings(idempotent=True),
)
# ...
Example using retry_operation_sync:
import ydb
async def execute_query(pool: ydb.aio.QuerySessionPool):
async def callee(session):
async with session.transaction(tx_mode=ydb.QuerySerializableReadWrite()) as tx:
async with await tx.execute("SELECT 1", commit_tx=True) as result_sets:
pass
await pool.retry_operation_async(
callee,
retry_settings=ydb.RetrySettings(max_retries=20, idempotent=True),
)
# ...
When using YDB through SQLAlchemy, retries happen internally and are not configured from the outside.