Работа с узлами координации

Данная статья описывает как использовать YDB SDK для координации работы нескольких экземпляров клиентского приложения посредством использования узлов координации и находящихся в них семафоры.

Создание узла координации

Узлы координации создаются в базах данных YDB в том же пространстве имён, что и другие объекты схемы, такие как таблицы и топики.

err := db.Coordination().CreateNode(ctx,
    "/path/to/mynode",
)
TClient client(driver);
auto status = client
    .CreateNode("/path/to/mynode")
    .ExtractValueSync();
Y_ABORT_UNLESS(status.IsSuccess());

При создании можно опционально указать TNodeSettings со следующими настройками:

  • ReadConsistencyMode - по умолчанию RELAXED, что допускает чтение не самого свежего значения в случае смены лидера. Опционально можно включить STRICT режим чтения, при котором все чтения проходят через алгоритм консенсуса и гарантируют возврат самого свежего значения, но становятся существенно дороже.

  • AttachConsistencyMode - по умолчанию STRICT, что означает обязательное использование алгоритма консенсуса при восстановлении сессии. Опционально можно включить RELAXED режим восстановления сессии в случае сбоев, который отключает это требование. Расслабленный режим может потребоваться при очень большом количестве клиентов, позволяя восстанавливать сессию без прохождения через консенсус, что не влияет на общую корректность, но может усугублять чтение не самого свежего значения во время смены лидера, а также устаревание сессий в случае проблем.

  • SelfCheckPeriod (по умолчанию 1 секунда) - периодичность с которой сервис производит проверки собственной живости. Не рекомендуется менять за исключением особых случаев.

    • Чем больше указанное значение, тем меньше нагрузка на сервер, но тем дольше возможная задержка между сменой лидера и тем, насколько оперативно об этом узнает сам сервис.
    • Чем меньше указанное значение, тем больше нагрузка на сервер и большая оперативность в детектировании проблем, но возможна генерация false positive когда сервис ошибочно детектирует проблемы.
  • SessionGracePeriod (по умолчанию 10 секунд) - период, в течение которого новый лидер не закрывает открытые сессии, продлевая их.

    • Чем меньше значение, тем меньше окно, когда сессии от несуществующих клиентов, которые не успели сообщить о пропаже при смене лидера, будут удерживать семафоры и мешать другим клиентам.
    • Чем меньше значение, тем выше вероятность ложных срабатываний, когда живой лидер может завершить работу для перестраховки, так как не будет уверен, что этот период не закончился у нового лидера.
    • Должен быть строго больше, чем SelfCheckPeriod.

Работа с сессиями

Создание сессии

Для начала работы клиент должен установить сессию, в рамках которой он будет осуществлять все операции с узлом координации.

session, err := db.Coordination().CreateSession(ctx,
    "/path/to/mynode", // имя Coordination Node в базе
)
TClient client(driver);
const TSession& session = client
   .StartSession("/path/to/mynode")
   .ExtractValueSync()
   .ExtractResult();

При установке сессии можно опционально передать структуру TSessionSettings со следующими настройками:

  • Description - текстовое описацие сессии, отображается во внутренних интерфейсах и может быть полезно при диагностике проблем.

  • OnStateChanged - вызывается на важных изменениях в процессе жизни сессии, передавая соответствующее состояние:

    • ATTACHED - сессия подключена и работает в нормальном режиме;
    • DETACHED - сессия временно потеряла связь с сервисом, но ещё может быть восстановлена;
    • EXPIRED - сессия потеряла связь с сервисом и не может быть восстановлена.
  • OnStopped - вызывается, когда сессия прекращает попытки восстановить связь с сервисом, что может быть полезно для установления нового соединения.

  • Timeout - максимальный таймаут, в течение которого сессия может быть восстановлена после потери связи с сервисом.

Контроль завершения сессии

Клиентскому приложению необходимо следить за состоянием сессии, так как оно может полагаться на состояние захваченных семафоров только пока сессия активна. Когда сессия завершается по инициативе клиента или сервера, клиент больше не может быть уверен, что другие клиенты в кластере не захватили его семафоры и не изменили их состояние.

В Go SDK для отслеживания таких ситуаций используется контекст сессии session.Context(), который завершается вместе с сессией. SDK самостоятельно обрабатывает ошибки транспортного уровня и восстанавливает соединение с сервисом, пытаясь восстановить сессию, если это возможно. Таким образом, клиенту достаточно следить только за контекстом сессии, чтобы своевременно отреагировать на её потерю.

В C++ SDK установленная сессия в фоне поддерживает и автоматически восстанавливает связь с кластером YDB.

Работа с семафорами

Создание семафора

При создании семафора можно указать его лимит. Лимит определяет максимальное значение, на которое его можно увеличить. Вызовы, пытающиеся увеличить значение семафора выше этого лимита, начнут ждать, пока их запросы на увеличение смогут быть выполнены, так чтобы значение семафора не превышало его лимит.

err := session.CreateSemaphore(ctx,
    "my-semaphore", // semaphore name
    10              // semaphore limit
)
session
    .CreateSemaphore(
        "my-semaphore",  // semaphore name
        10               // semaphore limit
    )
    .ExtractValueSync()
    .ExtractResult();

Также при создании семафора можно передать строку, которая будет храниться вместе с семафором и возвращаться при его захвате:

session
    .CreateSemaphore(
        "my-semaphore",  // semaphore name
        10,              // semaphore limit
        "my-data"        // semaphore data
    )
    .ExtractValueSync()
    .ExtractResult();

Захват семафора

Чтобы захватить семафор, клиент должен вызвать метод AcquireSemaphore и дождаться получения специального объекта Lease. Этот объект представляет собой подтверждение о том, что значение семафора было успешно увеличено и может считаться таковым до явного отпускания такого семафора или завершения сессии, в которой такое подтверждение было получено.

lease, err := session.AcquireSemaphore(ctx,
    "my-semaphore",  // semaphore name
    5,              // value to increase semaphore by
)

Для отмены ожидания взятия семафора, достаточно отменить переданный в метод контекст ctx.

session
    .AcquireSemaphore(
        "my-semaphore",                       // semaphore name
        TAcquireSemaphoreSettings().Count(5)  // value to increase semaphore by
    )
    .ExtractValueSync()
    .ExtractResult();

При захвате можно опционально передать структуру TAcquireSemaphoreSettings со следующими настройками:

  • Count - значение, на которое увеличивается семафор при захвате.

  • Data - дополнительные данные, которые можно положить в семафор.

  • OnAccepted - вызывается, когда операция встаёт в очередь (например, если семафор невозможно было захватить сразу).

    • Не будет вызвано, если семафор захватывается сразу.
    • Важно учитывать, что вызов может произойти параллельно с результатом TFuture.
  • Timeout - максимальное время, в течение которого операция может пролежать в очереди на сервере.

    • Операция вернёт false, если за время Timeout после добавления в очередь не удалось захватить семафор.
    • При Timeout установленном в 0 операция по смыслу работает как TryAcquire, т.е. семафор будет либо захвачен атомарно и операция вернёт true, либо операция вернёт false без использования очередей.
  • Ephemeral - если true, то имя является эфемерным семафором, такие семафоры автоматически создаются при первом Acquire и автоматически удаляются с последним Release.

  • Shared() - алиас для выставления Count = 1, захват семафора в shared режиме.

  • Exclusive() - алиас для выставления Count = max, захват семафора в exclusive режиме (для семафоров, созданных с лимитом Max<ui64>()).

Взятое значение захваченного семафора можно снизить (но не увеличить), вновь вызвав для него метод AcquireSemaphore с меньшим значением.

Обновление данных семафора

С помощью метода UpdateSemaphore можно обновить (заменить) данные семафора, которые были привязаны при его создании.

err := session.UpdateSemaphore(
    "my-semaphore",                                                          // semaphore name
    options.WithUpdateData([]byte("updated-data")),   // new semaphore data
)
session
    .UpdateSemaphore(
        "my-semaphore",  // semaphore name
        "updated-data"   // new semaphore data
    )
    .ExtractValueSync()
    .ExtractResult();

Этот вызов не требует захвата семафора и не приводит к нему. Если требуется, чтобы данные обновлял только один конкретный клиент, то это необходимо явным образом обеспечить, например, захватив семафор, обновив данные и отпустив семафор обратно.

Получение данных семафора

description, err := session.DescribeSemaphore(
    "my-semaphore"                                // semaphore name
    options.WithDescribeOwners(true), // to get list of owners
    options.WithDescribeWaiters(true), // to get list of waiters
)
session
    .DescribeSemaphore(
        "my-semaphore"  // semaphore name
    )
    .ExtractValueSync()
    .ExtractResult();

При получении информации о семафоре можно опционально передать структуру TDescribeSemaphoreSettings со следующими настройками:

  • OnChanged - вызывается один раз после изменения данных на сервере. C параметром bool, если true - то вызов произошёл из-за каких-то изменений, если false - то это ложный вызов и необходимо повторить DescribeSemaphore для восстановления подписки.
  • WatchData - вызывать OnChanged в случае изменения данных семафора.
  • WatchOwners - вызывать OnChanged в случае изменения владельцев семафора.
  • IncludeOwners - вернуть список владельцев в результатах.
  • IncludeWaiters - вернуть список ожидающих в результатах.

Результат вызова представляет собой структуру со следующими полями:

  • Name - имя семафора.
  • Data - данные семафора.
  • Count - текущее значение семафора.
  • Limit - максимальное количество токенов, указанное при создании семафора.
  • Owners - список владельцев семафора.
  • Waiters - список ожидающих в очереди на семафоре.
  • Ephemeral - является ли семафор эфемерным.

Поля Owners и Waiters в результате представляют собой список структур со следующими полями:

  • OrderId - порядковый номер операции захвата на семафоре. Может использоваться для идентификации, например если OrderId изменился, значит сессия сделала ReleaseSemaphore и новый AcquireSemaphore.
  • SessionId - идентификатор сессии, которая делала данный AcquireSemaphore.
  • Timeout - таймаут, с которым вызывался AcquireSemaphore для операций в очереди.
  • Count - запрошенное в AcquireSemaphore значение.
  • Data - данные, которые были указаны в AcquireSemaphore.

Освобождение семафора

Чтобы отпустить захваченный в сессии семафор, необходимо вызвать метод Release у объекта Lease.

err := lease.Release()
session
    .ReleaseSemaphore(
        "my-semaphore"  // semaphore name
    )
    .ExtractValueSync()
    .ExtractResult();

Важные особенности

Операции AcquireSemaphore и ReleaseSemaphore являются идемпотентными. Если на семафоре был вызван AcquireSemaphore, повторные вызовы AcquireSemaphore изменяют только параметры захвата. Например, вызов AcquireSemaphore с count=10 может добавить операцию в очередь. До или после успешного захвата можно повторно вызвать AcquireSemaphore с count=9, уменьшая количество захваченных единиц; новая операция заменит старую (которая завершится с кодом ABORTED, если она ещё не была успешно завершена). Позиция в очереди при этом не изменяется, несмотря на замену одной операции AcquireSemaphore на другую.

Операции AcquireSemaphore и ReleaseSemaphore возвращают bool, указывающий, изменила ли операция состояние семафора. Например, AcquireSemaphore вернёт false, если захват семафора не удался в течение времени Timeout, так как он был захвачен другим. Операция ReleaseSemaphore может вернуть false, если семафор не захвачен в текущей сессии.

Операцию AcquireSemaphore, находящуюся в очереди, можно завершить досрочно, вызвав ReleaseSemaphore. Независимо от количества вызовов AcquireSemaphore для конкретного семафора в одной сессии, освобождение происходит одним вызовом ReleaseSemaphore, то есть операции AcquireSemaphore и ReleaseSemaphore нельзя использовать как аналог Acquire или Release на рекурсивном мьютексе.

Операция DescribeSemaphore с флагами WatchData или WatchOwners создаёт подписку на изменения семафора. Любая более старая подписка на тот же семафор в сессии отменяется, вызывая OnChanged(false). Рекомендуется игнорировать OnChanged от предыдущих вызовов DescribeSemaphore, если выполняется новый замещающий вызов, например, запоминая текущий id вызова.

Вызов OnChanged(false) может происходить не только из-за отмены новым DescribeSemaphore, но и по другим причинам, например, при временном разрыве соединения между grpc клиентом и сервером, при временном разрыве соединения между grpc сервером и текущим лидером сервиса, при изменении лидера сервиса, то есть при малейшем подозрении, что нотификация могла быть потеряна. Для восстановления подписки клиентский код должен выполнить новый вызов DescribeSemaphore, правильно обрабатывая ситуацию, что результат нового вызова может быть другим (например, если нотификация действительно была потеряна).

Примеры