Working with coordination nodes
This article describes how to use the YDB SDK to coordinate the work of multiple client application instances using coordination nodes and their semaphores.
Creating a coordination node
Coordination nodes are created in YDB databases in the same namespace as other schema objects, such as tables and topics.
err := db.Coordination().CreateNode(ctx,
"/path/to/mynode",
)
TClient client(driver);
auto status = client
.CreateNode("/path/to/mynode")
.ExtractValueSync();
Y_ABORT_UNLESS(status.IsSuccess());
When creating a node, you can optionally specify TNodeSettings
with the following settings:
-
ReadConsistencyMode
- defaults toRELAXED
, allowing the reading of potentially outdated values during leader transitions. You can optionally enable theSTRICT
mode, where all reads are processed through the consensus algorithm, ensuring the most recent value is returned, albeit at a higher cost. -
AttachConsistencyMode
- defaults toSTRICT
, requiring the consensus algorithm for session recovery. Optionally, theRELAXED
mode can be enabled for session recovery during failures, bypassing this requirement. This mode may be necessary for a large number of clients, facilitating session recovery without consensus, which maintains overall correctness but may lead to outdated reads during leader transitions and session expiration in problematic scenarios. -
SelfCheckPeriod
(default 1 second) - the interval at which the service performs self-liveness checks. It is not recommended to change this setting except under special circumstances.- A larger value reduces server load but increases the delay in detecting leader changes and informing the service.
- A smaller value increases server load and improves problem detection speed, but may result in false positives when the service incorrectly identifies issues.
-
SessionGracePeriod
(default 10 seconds) - the duration during which a new leader refrains from closing existing open sessions, prolonging their validity.- A smaller value reduces the window during which sessions from non-existent clients, which failed to report their absence during leader changes, hold semaphores and block other clients.
- A smaller value also increases the likelihood of false positives, where a living leader might terminate itself as a precaution, uncertain if this period has concluded on a potential new leader.
- This value must be strictly greater than
SelfCheckPeriod
.
Working with sessions
Creating a session
To start working with coordination nodes, a client must establish a session within which it will perform all operations with the coordination node.
session, err := db.Coordination().CreateSession(ctx,
"/path/to/mynode", // Coordination Node name in the database
)
TClient client(driver);
const TSession& session = client
.StartSession("/path/to/mynode")
.ExtractValueSync()
.ExtractResult();
When establishing a session, you can optionally pass a TSessionSettings
structure with the following settings:
-
Description
- a text description of the session, displayed in internal interfaces and can be useful for problem diagnosis. -
OnStateChanged
- called on significant changes during the session's life, passing the corresponding state:ATTACHED
- the session is connected and operating in normal mode.DETACHED
- the session temporarily lost connection with the service but can still be restored.EXPIRED
- the session lost connection with the service and cannot be restored.
-
OnStopped
- called when the session stops attempting to restore the connection with the service, which can be useful for establishing a new connection. -
Timeout
- the maximum timeout during which the session can be restored after losing connection with the service.
Session control
It's important for the client application to monitor the session state, as it can only rely on the state of captured semaphores while the session is alive. When the session ends by client or server initiative, the client can no longer assume that other clients in the cluster haven't captured its semaphores and changed their state.
In Go SDK, the session context session.Context()
is used to track such situations, which terminates along with the session. The SDK can handle transport-level errors on its own and re-establish connection with the service, trying to restore the session if still possible. Thus, the client only needs to monitor the session context to react timely to its loss.
In the C++ SDK, an active session continuously maintains and automatically re-establishes the connection with the YDB cluster in the background.
Working with semaphores
Creating a semaphore
When creating a semaphore, you can specify its limit. The limit determines the maximum value to which it can be increased. Calls attempting to increase the semaphore value above this limit will wait until their increase requests can be fulfilled without exceeding the semaphore's limit.
err := session.CreateSemaphore(ctx,
"my-semaphore", // semaphore name
10 // semaphore limit
)
session
.CreateSemaphore(
"my-semaphore", // semaphore name
10 // semaphore limit
)
.ExtractValueSync()
.ExtractResult();
You can also pass a string that will be stored with the semaphore and returned when it's captured:
session
.CreateSemaphore(
"my-semaphore", // semaphore name
10, // semaphore limit
"my-data" // semaphore data
)
.ExtractValueSync()
.ExtractResult();
Acquiring a semaphore
To acquire a semaphore, the client must call the AcquireSemaphore
method and wait for a special Lease
object. This object represents confirmation that the semaphore value was successfully increased and can be considered as such until explicit release of such semaphore or termination of the session in which such confirmation was received.
lease, err := session.AcquireSemaphore(ctx,
"my-semaphore", // semaphore name
5, // value to increase semaphore by
)
Similar to the session, the Lease
object also has a context that terminates at one of these moments.
To cancel waiting for semaphore acquisition, it's sufficient to cancel the passed context ctx
.
session
.AcquireSemaphore(
"my-semaphore", // semaphore name
TAcquireSemaphoreSettings().Count(5) // value to increase semaphore by
)
.ExtractValueSync()
.ExtractResult();
When acquiring, you can optionally pass a TAcquireSemaphoreSettings
structure with the following settings:
-
Count
- value by which the semaphore is increased upon acquisition. -
Data
- additional data that can be put into the semaphore. -
OnAccepted
- called when the operation is queued. For example, if the semaphore couldn't be acquired immediately.- Won't be called if the semaphore is acquired immediately.
- It's important to consider that the call can happen in parallel with the
TFuture
result.
-
Timeout
- maximum time during which the operation can stay in the queue on the server.- Operation will return
false
if the semaphore couldn't be acquired withinTimeout
after being added to the queue. - With
Timeout
set to 0, the operation works likeTryAcquire
, i.e., the semaphore will either be acquired atomically and the operation will returntrue
, or the operation will returnfalse
without using queues.
- Operation will return
-
Ephemeral
- iftrue
, then the name is an ephemeral semaphore. Such semaphores are automatically created at firstAcquire
and automatically deleted with the lastRelease
. -
Shared()
- alias for settingCount = 1
, acquiring semaphore in shared mode. -
Exclusive()
- alias for settingCount = max
, acquiring semaphore in exclusive mode. (For semaphores created with limitMax<ui64>()
.)
The taken value of an acquired semaphore can be decreased (but not increased) by calling the AcquireSemaphore
method again with a smaller value.
Updating semaphore data
Using the UpdateSemaphore
method, you can update (replace) the semaphore data that was attached during its creation.
err := session.UpdateSemaphore(
"my-semaphore", // semaphore name
options.WithUpdateData([]byte("updated-data")), // new semaphore data
)
session
.UpdateSemaphore(
"my-semaphore", // semaphore name
"updated-data" // new semaphore data
)
.ExtractValueSync()
.ExtractResult();
This call doesn't require acquiring the semaphore and doesn't lead to it. If you need only one specific client to update the data, this must be explicitly ensured, for example, by acquiring the semaphore, updating the data, and releasing the semaphore back.
Getting semaphore data
description, err := session.DescribeSemaphore(
"my-semaphore" // semaphore name
options.WithDescribeOwners(true), // to get list of owners
options.WithDescribeWaiters(true), // to get list of waiters
)
session
.DescribeSemaphore(
"my-semaphore" // semaphore name
)
.ExtractValueSync()
.ExtractResult();
When getting information about a semaphore, you can optionally pass a TDescribeSemaphoreSettings
structure with the following settings:
OnChanged
- called once after data changes on the server (with abool
parameter, iftrue
- the call occurred due to some changes, iffalse
- it's a false call and you need to repeatDescribeSemaphore
to restore the subscription).WatchData
- callOnChanged
when semaphore data changes.WatchOwners
- callOnChanged
when semaphore owners change.IncludeOwners
- return the list of owners in the results.IncludeWaiters
- return the list of waiters in the results.
The call result is a structure with the following fields:
Name
- semaphore name.Data
- semaphore data.Count
- the current value of the semaphore.Limit
- the limit specified when creating the semaphore.Owners
- list of semaphore owners.Waiters
- list of clients waiting in the semaphore queue.Ephemeral
- whether the semaphore is ephemeral.
The Owners
and Waiters
fields in the result contain a list of structures with the following fields:
OrderId
- sequence number of the acquire operation on the semaphore (can be used for identification, for example ifOrderId
changed, it means the session calledReleaseSemaphore
and a newAcquireSemaphore
).SessionId
- identifier of the session that made thisAcquireSemaphore
call.Timeout
- timeout value used in theAcquireSemaphore
call for queued operations.Count
- value requested inAcquireSemaphore
.Data
- data specified inAcquireSemaphore
.
Releasing a semaphore
To release a semaphore acquired in a session, call the Release
method on the Lease
object.
err := lease.Release()
session
.ReleaseSemaphore(
"my-semaphore" // semaphore name
)
.ExtractValueSync()
.ExtractResult();
Important implementation details
The AcquireSemaphore
and ReleaseSemaphore
operations are idempotent. When AcquireSemaphore
is invoked on a semaphore, subsequent calls to AcquireSemaphore
only adjust the acquisition parameters. For instance, if AcquireSemaphore
is called with count=10
, the operation might be queued. You can call AcquireSemaphore
again with count=9
before or after successful acquisition, reducing the number of acquired units. The new operation replaces the old one, which will complete with an ABORTED
code if it hasn't completed successfully yet. The queue position remains unchanged despite replacing one AcquireSemaphore
operation with another.
Both AcquireSemaphore
and ReleaseSemaphore
operations return a bool
indicating whether the semaphore state was altered. For example, AcquireSemaphore
might return false
if the semaphore couldn't be acquired within the Timeout
period because it was acquired by another entity. Similarly, ReleaseSemaphore
might return false
if the semaphore isn't acquired within the current session.
A queued AcquireSemaphore
operation can be prematurely terminated by calling ReleaseSemaphore
. Regardless of how many AcquireSemaphore
calls are made for a specific semaphore within one session, a single ReleaseSemaphore
call releases it. Thus, AcquireSemaphore
and ReleaseSemaphore
operations cannot function as Acquire
/Release
on a recursive mutex.
The DescribeSemaphore
operation with WatchData
or WatchOwners
flags set establishes a subscription for semaphore changes. Any previous subscription to the same semaphore within the session is canceled, triggering OnChanged(false)
. It is advisable to disregard OnChanged
from earlier DescribeSemaphore
calls if a new replacing call is made, for instance, by tracking a current call ID.
The OnChanged(false)
call might occur not only due to cancellation by a new DescribeSemaphore
but also for various reasons, such as temporary connection loss between the gRPC client and server, temporary connection loss between the gRPC server and the current service leader, or service leader changes. This happens at the slightest suspicion that a notification might have been lost. To restore the subscription, client code must issue a new DescribeSemaphore
call, properly managing the situation where the result of the new call might differ (for example, if the notification was indeed lost).