DataShard: блокировки и видимость изменений в транзакциях

YQL запросам может требоваться прочитать данные, которые были записаны предыдущими запросами в рамках той же транзакции, ещё до коммита транзакции. В этом случае YDB необходимо поддерживать консистентную видимость данных с учётом сделанных изменений. Для реализации этого сценария DataShard поддерживает запись незакомиченных изменений в рамках транзакции, а затем включает эти данные в последующих чтениях в рамках той же транзакции. Также он позволяет атомарно коммитить все накопленные изменения с учётом ограничений изоляции serializable.

Для реализации этой возможности DataShard использует поддержку транзакций с большим количеством изменений в нижестоящей LocalDB. Эти транзакции не ограничены размером одного сообщения между акторами в распределённой системе.

Высокоуровневая схема работы

YQL транзакции могут разбиваться на несколько «фаз» в KQP, используя результат одной фазы в качестве входа для последующих фаз. Это происходит либо для «сложных» транзакций, состоящих из нескольких подзапросов, либо «интерактивных», где клиент начинает транзакцию и выполняет запросы без коммита. Например, если YQL запрос содержит JOIN, первая фаза может прочитать левую таблицу, а затем во второй фазе использовать этот результат для выполнения точечных чтений по правой таблице.

Для читающих запросов KQP использует глобальный MVCC снимок для обеспечения консистентности между подзапросами. Для пишуших запросов нужно также убедиться во время коммита, что serializable изоляция не нарушается. Это обеспечивается с помощью Оптимистического управления параллелизмом, где чтения добавляют оптимистические блокировки для прочитанных диапазонов, а запись в других транзакциях «ломает» эти блокировки во время их коммита. Транзакция может успешно закоммититься только если ни одна из её блокировок не была сломана другой транзакцией. В противном случае транзакция завершится с ошибкой «transaction locks invalidated».

Транзакция может сделать несколько чтений используя временные метки чтения, пока другие транзакции параллельно пишут в те же таблицы и шарды. Это может быть как один глобальный MVCC снимок, так и несколько временных меток, разные для каждого чтения. Когда транзакция коммитится ей назначается единое время коммита в глобальном порядке выполнения. Если все чтения можно повторить в точке времени коммита, без изменения видимых результатов, такая транзакция могла бы целиком выполниться в точке с временем коммита. Не сломанная оптимистическая блокировка показывает транзакции, что эти чтения можно переместить в более позднюю точку с временем коммита.

Незакомиченные изменения немногим отличаются от чтений с точки зрения сериализации. Если DataShard может запомнить эти изменения и затем «переместить» их в финальную точку с временем коммита, такая транзакция может быть закоммичена, иначе она должна быть отменена. В отличии от блокировок чтения (которые хранятся в памяти), незакомиченные изменения персистентны, и их нужно правильно удалять когда в них больше нет необходимости.

Как блокировки используются во время чтений

Подготавливаемым на DataShard'ах операциям обычно назначают глобально уникальный 64-битный TxId. Они выделяются достаточно большими батчами с помощью глобальных таблеток TxAllocator. Когда KQP начинает первое чтение в многофазной транзакции, он также использует этот TxId в качестве идентификатора блокировки LockTxId, который в дальнейшем используется во всех запросах в той же YQL транзакции. DataShard добавляет новые блокировки, если в операции указан ненулевой LockTxId:

  • См. поле LockTxId в читающих TEvRead запросах
  • См. поле LockTxId в пишущих TEvWrite запросах
  • См. поле LockTxId в сообщениях TDataTransaction (используется для кодирования транзакций, которые работают с данными таблиц)

Также используется поле LockNodeId, в котором указывается идентификатор ноды с источником блокировки. DataShard использует это поле для подписки на статус блокировки и зачистки блокировок, которые перестают использоваться.

LockTxId — это просто уникальное число, которое используется множеством операций в одной и той же YQL транзакции, в то время как TxId уникальный для каждой операции на одном DataShard'е. Использование первого TxId в качестве LockTxId не обязательно, но т.к. KQP уже получил глобально уникальное число с выделением TxId, а LockTxId не пересекается с TxId по использованию, это экономит аллокацию.

Таблица блокировок (см. datashard_locks.h и datashard_locks.cpp) индексирует блокировки по диапазонам первичного ключа через дерево регионов, позволяя находить и «ломать» их по точечным ключам. В простом случае читающие операции добавляют диапазон с помощью метода SetLock, а пишущие операции ломают другие блокировки по записанному ключу используя метод BreakLocks.

Когда блокировка добавляется впервые, ей назначается монотонно возрастающий Counter в текущем Generation таблетки (см. сообщение TLock). Также строка с этими числами появляется в виртуальной таблице /sys/locks (сама таблица больше не используется). Информация о взятых блокировках возвращается в результирующих сообщениях (например см. TEvReadResult).

В успешном сценарии, когда пока блокировка существует и не сломана, поля Generation и Counter не меняются.
В неуспешных сценариях ранее установленные блокировки инвалидируются. Например, в случае инвалидации блокировки при рестарте изменяется значение Generation, а Counter может менять при явном статусе ошибки или при инвалидации и пересоздании блокировки в том же поколении.

Первые Generation и Counter для каждого шарда запоминаются в KQP. Если во время жизни транзакции они изменились, это сигнализируют о неконсистентности и нарушении serializable изоляции. Пишущие транзакции, либо транзакции которые не используют глобальный MVCC снимок для эффективности, проверяют значения Generation и Counter блокировок при финальный коммите. Коммит может завершиться успехом только если все значения всех блокировок совпадают.

При использовании глобального MVCC снимка все чтения с этим снимком гарантированно консистентны. Тем не менее, они происходят со взятием блокировок на случай, если транзакция в дальнейшем окажется пишущей. DataShard при этом дополнительно проверяет наличие конфликтующих данных, закомиченных поверх читаемого снимка. При обнаружении конфликта, чтение не завершается с ошибкой, однако поле Counter блокировки выставляется равным ErrorAlreadyBroken. Это сигнализирует о том, что блокировку взять не удалось и записать что-то в этой транзакции в дальнейшем не получится. Если окажется, что вся транзакция целиком выполняла только чтения, то она завершится успешно. Однако, при любой попытке что-либо записать такая транзакция сразу же завершится с ошибкой, так как коммит будет гарантированно не возможен.

Как блокировки используются для записи изменений

Когда KQP необходимо сделать незакомиченные изменения в рамках YQL транзакции, в операции записи на DataShard'е также указывается ненулевой LockTxId. Помимо установки блокировок DataShard использует этот LockTxId в качестве TxId для записи незакомиченных изменений. Эти изменения будут храниться пока блокировка остаётся валидной. Блокировки с незакомиченными изменениями называются пишущими блокировками (write locks), а информация по ним сохраняется в специальных таблицах (см. таблицу Locks и связанные таблицы ниже), что позволяет им переживать рестарты DataShard'ов.

При использовании операций записи незакомиченных изменений есть несколько ограничений:

  • Операция должна выполняться в режиме immediate, таким образом нельзя записать незакомиченные изменения распределённой транзакцией. Вместо этого изменения пишутся независимо на каждый шард.
  • В операции должен быть указан LockNodeId с нодой, через которую DataShard подписывается на статус блокировки и автоматически удаляет изменения, если блокировка пропадает. Например, блокировка пропадает, если транзакция отменяется нештатно или нода с состоянием транзакции не отвечает.
  • У операции должен быть указан правильный MVCC снимок, который используется для чтений и поиска конфликтов. Ожидается, что в рамках одной YQL транзакции используется один и тот же MVCC снимок во всех операциях чтения и записи.
  • Существующая блокировка должна быть валидна и не сломана. В противном случае для указанного LockTxId не должно быть уже имеющихся нескомпакченных данных в LocalDB. Это защищает от нештатных ситуаций, когда транзакция отменяется на шарде (например, на основе статуса блокировки), а KQP пытается продолжить в ней писать.

При чтениях YQL транзакция должна указывать тот же LockTxId. При этом чтения через LocalDB будут использовать особую карту транзакций, в которой LockTxId как-будто уже закоммичен. Это позволяет транзакции видеть собственные изменения, но не изменения других незакомиченных транзакций. В карте транзакций добавляется особая запись [LockTxId] => v{min}, позволяя видеть собственные изменения независимо от значения конкретного MVCC снепшота.

Незакомиченные записи требуют дополнительного анализа конфликтов (см. CheckWriteConflicts в реализации minikql engine host). Когда несколько незакомиченных транзакций пытаются писать в один и тот же ключ, DataShard должен убедиться, что в случае коммита соответствующие им конфликтующие блокировки будут сломаны. Объекты для отслеживания транзакций (например TLockedWriteTxObserver) используются для обнаружения таких конфликтов. LocalDB вызывает соответствующие методы этих объектов перед пропуском или применением изменений. Перед записью DataShard выполняет чтение в специальном режиме для поиска всех незакомиченных изменений по ключу. В случае обнаружения конфликта они добавляются в граф конфликтов между блокировками. Таким образом, каждая блокировка знает какие блокировки должны быть сломаны в случае коммита.

Если на шарде есть незакомиченные изменения, то чтения также должны дополнительно проверять конфликты. Это необходимо, так как чтения должны быть не только консистентными в рамках текущей транзакции, но и учитывать согласованность состояния с будущей точкой времени коммита. Здесь также используются объекты для отслеживания, чтобы собирать списки незакомиченных транзакций, пропущенных во время чтения. На их основе добавляются связи в графе между пишущими и читающими блокировками.

Особой обработки требует ситуация, когда транзакция делает чтение с учётом собственных незакомиченных изменений, которые были поверх ранее закомиченных изменений, но сделанных уже после получения MVCC снимка в транзакции. Пример подобной ситуации:

  1. Ключ K изначально имеет A = 1 в версии v4000/100. Два числа здесь — это Step (4000) и TxId (100) времени коммита.
  2. Tx1 открывается и получает MVCC снимок v5000/max.
  3. Tx2 коммитит слепую запись UPSERT с B = 2 в версии v6000/102.
  4. Tx1 делает слепую запись UPSERT с C = 3 и TxId = 101.
  5. В этот момент Tx1 ещё могла бы закоммититься успешно, так как она не читала ключ K и изменения с C = 3 всё ещё может переместиться в будущее время коммита.
  6. Tx1 выполняет чтение по ключу K, которое выполняется с использованием снимка v5000/max:
    • На время чтения будет использоваться временная запись [101] => v{min} в карте транзакций;
    • Итерация спозиционируется на первой дельте с изменением C = 3 (т.к. v{min} <= v5000/max), которое будет добавлено в состояние строки;
    • Все прочие закомиченные дельты будут также добавлены, т.е. состояние строки будет включать B = 2 которое в этот момент уже закоммичено;
    • Однако, здесь возникает конфликт, т.к. B = 2 было закоммичено уже после MVCC снимка (v6000/102 > v5000/max);
    • Это обнаружится в методе OnApplyCommitted, который вызывает CheckReadConflict;
    • Так как подобное состояние строки не соответствует изоляции, блокировка транзакции будет автоматически сломана, а также будет взведён флаг неконсистентности чтения.
  7. С ошибкой завершится не только данное чтение, но также Tx1 не сможет быть закоммичена, т.к. произошло нарушение изоляции.
  8. Клиентское приложение получит ошибку «transaction locks invalidated» и может принять решение о повторе транзакции с начала.

Взаимодействие с накоплением изменений

У DataShard'ов есть так называемые накопители изменений (change collectors), которые логируют происходящие изменения и потоково отправляют их в другие подсистемы. В частности, это используется для поддержки Change Data Capture и Асинхронных вторичных индексов. В зависимости от режима работы, накопителям изменений может требоваться знать предыдущее состояние строки, и для этого они выполняют чтение строки перед её записью.

Накопителям изменений также нужно обрабатывать запись незакомиченных изменений. Однако эти записи нельзя отправлять в другие системы до коммита. Также порядок отправки изменений должен совпадать с финальным порядком их применения. Для поддержки этих требований изменения сначала аккумулируются в отдельной таблице LockChangeRecords, а с случае успешного коммита транзакции они атомарно добавляются в выходной поток с помощью добавления одной строки в таблице ChangeRecordCommits. Таким образом, обработка изменений происходит постепенно, что избавляет от дорогостоящей массовой обработки во время коммита. Объем накапливаемых изменений не ограничен.

Взаимодействие с распределёнными транзакциями

Когда распределённая транзакция начинает выполнение, она валидирует блокировки на текущем шарде и рассылает результат на других участников транзакции. Если все результаты получены и оказались успешными, то тело транзакции может продолжить выполнение и применить эффекты. Другими словами, если все чтения со всех участвующих шардов удаётся переместить на время коммита без нарушения гарантий изоляции, то транзакция может применить все накопленные изменения. В противном случае тело транзакции не выполняется, возвращается соответствующий код ошибки, а все накопленные изменения отменяются. Результаты валидации блокировок запоминаются персистентно. Корректность выполнения обеспечивается с помощью анализа конфликтов между транзакциями.

В отличии от обычных транзакций, коммит блокировки с незакомиченными изменениями может затрагивать произвольные ключи, которые не хранятся в памяти DataShard'а. Соответственно, проанализировать конфликты по пересечению ключей не представляется возможным. После валидации пишущей блокировки, она «замораживается» и больше не может сломаться. Если конфликтующая транзакция попытается сломать замороженную блокировку, она вместо этого ставится на паузу и дожидается пока транзакция с замороженной блокировкой завершится. DataShard следит за тем, коммит каких блокировок может сломать другие блокировки. Для защиты от тупиковой ситуации он также проверяет может ли коммит транзакции А изменить результат валидации транзакции Б и наоборот. Если транзакция А должна выполниться до транзакции Б в глобальном порядке транзакций, DataShard не будет начинать валидацию транзакции Б пока транзакция А не завершится. Пока транзакции не конфликтуют они могут выполняться в любом порядке.

Коммит изменений

Для коммита ранее записанных изменений, KQP запускает на шардах транзакцию, которая коммитит ранее захваченные блокировки. У такой транзакции не указывается LockTxId, так как такой коммит не захватывает новые блокировки. В списке Locks перечисляются все ранее установленные блокировки, а в поле Op указывается Commit. Сама транзакция коммита на шардах может быть либо immediate (если затрагивает только один шард), либо подготовлена как распределённая транзакция на нескольких участниках.

Для поддержки распределённых транзакций все шарды, которые проверяют блокировки, включаются в список SendingShards, а все шарды, применяющие эффекты, включаются в список ReceivingShards. Во время валидации отправляющие шарды подготавливают персистентные readset'ы и отправляют их на все принимающие шарды. Принимающие же шарды после валидации ждут получения readset'ов перед тем как начинать выполнение транзакции. Во время выполнения, в случае если все результаты валидации были успешными, блокировки коммитятся через вызов KqpCommitLocks. В противном случае, тело транзакции не выполняется, а блокировка удаляется вместе с отменой всех незакомиченных изменений через вызов KqpEraseLocks. Так как в случае ошибки коммита изменения отменяются, отдельно коммит ретраить нельзя.

Примечание

Все незакомиченные изменения с одинаковым LockTxId должны быть включены в транзакцию коммита, и транзакцию нельзя коммитить частично. Например, если транзакция сделала несколько записей, и на одной из записей произошла ошибка (например в случае кратковременной недоступности шарда, когда нельзя однозначно понять была ли запись произведена), то будет некорректно пропустить эту запись и закоммитить транзакцию частично. Если в будущем произойдёт мерж шардов, то несовпадение статуса транзакции может приводить к аномалиям.

В транзакции коммита также могут быть дополнительные изменения, которые атомарно применяются сразу после коммита блокировки. Для уменьшения задержек KQP старается аккумулировать изменения в памяти либо до момента их коммита, либо до необходимости чтения этих изменений в той же транзакции. Далее KQP отправляет накопленные изменения вместе с транзацией коммита. В случае, если удаётся накопить все изменения до момента коммита, незакомиченных изменений не происходит, а блокировки при необходимости берутся только на чтение.

При необходимости явной отмены изменений YQL транзакция запускает пустую транзакцию на шардах с указаем Rollback в качестве операции блокировки. Если по какой-то причине этого не происходит, при разрушении TLockHandle все подписанные даташарды автоматически чистят блокировки через внутреннюю транзакцию TxRemoveLock. Явная отмена предпочтительна, т.к. незакомиченные изменения потребляют ресурсы и в случае асинхронной очистки через TLockHandle нет гарантии, что эти ресурсы освободятся до того, как новые транзакции попытаются записать ещё больше изменений.

Ограничения

Наличие незакомиченных изменений приводит к тому, что всем новым попыткам записи на тот же шард приходится дополнительно проверять конфликты, из-за чего запись перестаёт быть «слепой». Чтение перед каждой записью делает их дороже, увеличивает время выполнения транзакций и уменьшает пропускную способность. Для максимальной производительности рекомендуется выполнять транзакции таким образом, что сначала выполняются все чтения, а затем небольшое количество слепых записей. В этом случае незакомиченные изменения не используются и производительность шардов остаётся оптимальной.

Из-за ограничений LocalDB даташарды также вынуждены гарантировать, что не будет образовываться слишком много открытых транзакций. Сейчас таблица блокировок ограничена примерно 10 тысячами записей, что включает в себя пишущие блокировки. Даташарды также вынуждены подсчитывать количество уже имеющихся незакомиченных изменений перед каждой новой записью, что реализовано через подсчёт пропуском в объекте для отслеживания транзакций. В случае превышения лимита кидается исключение TLockedWriteLimitException.

Персистентные блокировки переживают рестарты даташардов и восстанавливают своё последнее состояние. В этом механизме поддерживается возможность сохранения диапазонов, но на практике она не используются, так как даташард был бы вынужден записывать новые диапазоны во время каждого чтения, что слишком дорог). Вместо этого читающие блокировки, ставшие персистентными из-за наличия записи, восставливаются с диапазоном «весь шард». Это лучше, чем инвалидация на любом рестарте, но конфликтует с любой записью и увеличивает риск ошибки «transaction locks invalidated» (TLI).