LocalDB: персистентные незакомиченные изменения

Таблеткам может понадобиться сохранять большое количество изменений на протяжении длительного времени, а затем целиком закоммитить или отменить их. Для поддержки этого сценария LocalDB позволяет маркировать незакомиченные изменения в таблицах уникальным 64-битным идентификатором транзакции (TxId). Эти данные сохраняются в таблице рядом с закомиченными данными, но не видны для остальных запросов до коммита. Коммит или отмена транзакции атомарная и дешёвая, а со временем эти данные интегрируются в таблицу в обычном закомиченном виде. Таким образом, незакомиченные данные не хранятся в оперативной памяти длительное время и не ограничены её объемом.

Этот механизм используется в качестве основы для следующей функциональности:

  • Сохранение незакомиченных изменений в долгих YQL транзакциях и возможность видеть эти изменения в той же транзакции до их коммита;
  • Сохранение эффектов в волатильных распределённых транзакциях с возможностью отката;
  • Межкластерная консистентная репликация, в которой изменения потоково применяются небольшими батчами, и периодически коммитятся, делая видимым очередной консистентный снимок исходной базы данных.

Ограничения

Этот механизм имеет следующие ограничения:

  1. По одному ключу можно параллельно делать изменения в рамках разных TxId, но эти транзакции должны коммититься в том же порядке, в котором происходила запись изменений. Например, если по ключу K было изменение от транзакции tx1, а затем от транзакции tx2, то можно закоммитить транзакцию tx1, а затем tx2, и оба изменения будут видны. Однако, если сначала закоммитить tx2, то tx1 должна быть отменена.
  2. Количество незакомиченных транзакций и объём данных в них по конкретному ключу должны быть ограничены вышестоящими уровнями.
  3. TxId должен быть глобально уникальным и не должен переиспользоваться после коммита или отмены транзакции, в том числе между разными шардами.

Запись незакомиченных изменений

В логе восстановления (см. flat_redo_writer.h) есть следующие события:

  • EvUpdateTx сохраняет изменения в таблице с указанием незакомиченного TxId. Это событие создаётся методом TDatabase::UpdateTx локальной базы данных (LocalDB).
  • EvRemoveTx используется для удаленения указанного TxId, когда выполняется отмена транзакции. Это событие создаётся методом TDatabase::RemoveTx локальной базы данных.
  • EvCommitTx используется для коммита указанного TxId с указанием MVCC версии коммита. Это событие создаётся методом TDatabase::CommitTx локальной базы данных.

Хранение незакомиченных изменений в MemTable

MemTable в LocalDB — это небольшое отсортированное по ключу дерево недавно сделанных изменений, которое хранится в памяти. В качестве ключа в этом дереве используется ключ таблицы, а в качестве значения — указатель на цепочку изменений для соответствующего ключа. Для каждого изменения указывается MVCC версия этого изменения (пара Step/TxId с глобальным временем коммита). Строки в цепочке изменений смержены в рамках одного MemTable. Например, представим что у нас были следующие операции для ключа K:

Версия Операция
v1000/10 UPDATE ... SET A = 1
v2000/11 UPDATE ... SET B = 2
v3000/12 UPDATE ... SET C = 3

Тогда цепочка строк с изменениями для этого ключа в единственном MemTable будет выглядеть следующим образом:

Версия Строка
v3000/12 SET A = 1, B = 2, C = 3
v2000/11 SET A = 1, B = 2
v1000/10 SET A = 1

В некоторых случаях MemTable может разделиться между операциями, что происходит, например, с началом компакшена. Тогда MemTable из примера выше выглядел бы так:

MemTable Версия Строка
Эпоха 2 v3000/12 SET B = 2, C = 3
Эпоха 2 v2000/11 SET B = 2
Эпоха 1 v1000/10 SET A = 1

Новые изменения применяются к текущему MemTable, даже если они ещё не были закомичены. При записи незакомиченные изменения помечаются специальной версией, с указанием максимального значения для Step (как бы подразумевая далёкое будущее) и TxId транзакции в качестве TxId. Мёрж нижестоящих изменений не производится. Например, представим, что далее мы выполнили следующие незакомиченные операции для ключа K:

TxId Операция
15 UPDATE ... SET C = 10
13 UPDATE ... SET B = 20

Цепочка изменений в MemTable теперь будет выглядеть так:

Версия Строка
v{max}/13 SET B = 20
v{max}/15 SET C = 10
v3000/12 SET A = 1, B = 2, C = 3
v2000/11 SET A = 1, B = 2
v1000/10 SET A = 1

Во время чтения для изменений с версией Step == max, итератор проверяет их TxId по таблице транзакций. Для закомиченных транзакций там указана MVCC версия коммита. Далее он применяет все закомиченные изменения, пока не найдёт и не применит первую смерженную запись, у которой версия Step != max.

Представим, что мы закоммитили транзакцию с TxId = 13 в точке v4000/20. В этот момент в таблице транзакций появится запись [13] => v4000/20, и эта транзакция станет закомиченной. Все последующие чтения будут видеть изменения с TxId = 13, пропуская транзакцию с TxId = 15, так как она не была закоммичена до TxId = 13. Цепочка изменений для ключа K при этом не меняется.

Теперь представим, что мы выполнили SET A = 30 в версии v5000/21. Итоговая цепочка изменений будет выглядеть следующим образом:

Версия Строка
v5000/21 SET A = 30, B = 20, C = 3
v{max}/13 SET B = 20
v{max}/15 SET C = 10
v3000/12 SET A = 1, B = 2, C = 3
v2000/11 SET A = 1, B = 2
v1000/10 SET A = 1

Новая строка была добавлена в смёрженном состоянии, включая изменения для TxId = 13. Так как транзакция TxId = 15 не закоммичена, её изменения были пропущены, что отразилось в смёрженном состоянии для v5000/21. Важно, чтобы вышестоящий код не закоммитил TxId = 15 после этого, так как подобный коммит приведёт в аномалии: часть версий на чтении будут видеть транзакцию как закомиченную, а часть не будет.

Компакшен незакомиченных изменений

Компакшен берёт часть данных таблицы, мёржит их в отсортированном порядке и пишет в виде новой Sorted String Table (SST), которая заменяет собой скомпакченные данные. Если в компакшене участвует MemTable, то это подразумевает также компакшен части лога изменений. В скомпакшенном логе могут встречаться сообщения EvRemoveTx/EvCommitTx, которые меняют таблицу закомиченных транзакций — они также должны оказаться в персистентном сторадже. Это реализовано с помощью записи TxStatus блобов (см. flat_page_txstatus.h), которые LocalDB сохраняет наравне с записью SST с данными. В этих блобах сохраняется текущий список закомиченных и удалённых транзакций, который заменяют собой весь ранее записанный лог в части EvRemoveTx/EvCommitTx событий. На вход в компакшен передаётся ссылка на всю текущую таблицу, но во время записи таблица фильтруется, сохраняя только те транзакции, которые упоминались в компактящихся MemTable или предыдущих TxStatus страницах.

На уровне страниц с данными (см. flat_page_data.h) незакомиченные изменения из MemTable (или других SST) агрегируются по TxId с сохранением их относительного порядка и добавляются перед основной записью в виде незакомиченных дельта записей. У записи есть MVCC флаги (HasHistory, IsVersioned, IsErased), которые говорят о наличии по ключу MVCC истории и наличии полей с MVCC версией строки. Для дельта записей используется особый флаг IsDelta, который представляет из себя флаг HasHistory без указания других MVCC флагов. Для строк с дельта флагом после фиксированной части в полях расширения хранится структура TDelta, в которой указывается TxId незакомиченной на момент компакшена транзакции.

По одному ключу может быть несколько дельта записей, а также опционально самая свежая закомиченная запись. Так как по одному ключу на странице данных может быть только одна запись, таблица смещений указывает на первую дельта запись, а остальные записи при этом доступны через таблицу альтернативных свещений для этой записи:

Смещение Описание
-X*8 смещение Main
... ...
-16 смещение Delta 2
-8 смещение Delta 1
0 заголовок Delta 0
... ...
смещение Delta 1 заголовок Delta 1
... ...
смещение Main заголовок Main

Имея указатель на запись Delta 0, остальные записи для ключа можно получить через метод GetAltRecord(size_t index), где index — номер записи в цепочке (1 для Delta 1). Цепочка заканчивается либо указателем на основную запись (без дельта флага), либо 0, если основная запись отсутствует.

Продолжая начатый выше пример, предположим, что после записи транзакции с TxId = 13 текущий MemTable целиком скомпактили. Запись по 32-битному ключу K может выглядеть следующим образом (смещения указаны относительно указателя в списке записей на странице):

Смещение Значение Описание
-16 58 смещение Main
-8 29 смещение Delta 1
0 0x21 Delta 0: IsDelta + ERowOp::Upsert
1 0x00 .. ключевая колонка не-NULL
2 K .. ключевая колонка (32-bit)
6 0x00 .. колонка A пустая
7 0 .. колонка A (32-bit)
11 0x01 .. колонка B = ECellOp::Set
12 20 .. колонка B (32-bit)
16 0x00 .. колонка C пустая
17 0 .. колонка C (32-bit)
21 13 .. TDelta::TxId
29 0x21 Delta 1: IsDelta + ERowOp::Upsert
30 0x00 .. ключевая колонка не-NULL
31 K .. ключевая колонка (32-bit)
35 0x00 .. колонка A пустая
36 0 .. колонка A (32-bit)
40 0x00 .. колонка B пустая
41 0 .. колонка B (32-bit)
45 0x01 .. колонка C = ECellOp::Set
46 10 .. колонка C (32-bit)
50 15 .. TDelta::TxId
58 0x61 Main: HasHistory + IsVersioned + ERowOp::Upsert
59 0x00 .. ключевая колонка не-NULL
60 K .. ключевая колонка (32-bit)
64 0x01 .. колонка A = ECellOp::Set
65 1 .. колонка A (32-bit)
69 0x01 .. колонка B = ECellOp::Set
70 2 .. колонка B (32-bit)
74 0x01 .. колонка C = ECellOp::Set
75 3 .. колонка C (32-bit)
79 3000 .. RowVersion.Step
87 12 .. RowVersion.TxId
95 - Конец записи

При этом оставшиеся две записи будут в исторических данных в ключами (RowId, 2000, 11) и (RowId, 1000, 10), соответственно. Об их наличии которых говорит флаг HasHistory в основной записи.

В процессе компакшена итератор работает в специальном режиме с перебором всех дельт и всех версий для каждого ключа. Реализация скана для компакшена (см. flat_ops_compact.h) сначала агрегирует незакомиченные на момент начала компакшена дельты по TxId с сохранением их относительного порядка. В случае, если изменения от разных TxId накладываются друг на друга, их порядок может поменяться произвольным образом. Такое изменение порядка валидно, так как такие транзакции пересекаются по порядку записи и вышестоящий уровень не имеет права коммитить их одновременно. Когда все незакомиченные дельты сагрегированы, они пишутся в правильном порядке в результирующую SST (см. flat_part_writer.h и flat_page_writer.h). После этого начинается перебор версий строк для ключа, которые пишутся в SST в порядке уменьшения версий.

Когда итератор встречает первую закомиченную дельту, версия коммита этой дельты берётся в качестве версии строки. В состояние строки мёржатся все закомиченные дельты ниже, включая первую закомиченную версию строки с каждого уровня LSM, участвующего в компакшене. Для перехода к следующей версии итератор пропускает закомиченные дельты с версией большей либо равной последней версии и процесс повторяется.

Если по одному ключу записали и скомпактили большое количество дельт, после чего быстро закомитили с разными версиями, то формирование всех версий строки будет расти квадратично с ростом кол-ва дельт. Это одна из причин, по которой вышестоящие уровни должны контролировать количество дельт по ключу и не допускать их бесконтрольного роста. Другим ограничением сейчас является то, что все незакомиченные дельты необходимо держать в памяти, т.к. они могут быть закомиченны в любой момент и мы должны проверять это во время поиска. При этом все дельты для ключа должны находиться на одной странице.

Статистика по незакомиченным транзакциям

Оптимистично большинство пишущих транзакций в итоге коммитятся успешно, но иногда транзакции приходится отменять, в том числе уже после компакшена. Так как после их отмены большое количество данных может оказаться по факту удалено, для целей сборки мусора в SST сохраняется страница TxIdStats (см. flat_page_txidstat.h) со статистикой по количеству строк и байт, занимаемых каждым TxId. По мере того, как транзакции отменяются, стратегия компакшена агрегирует количество высвобождающихся байт, и может принять решение о запуске дополнительных компакшенов для сборки мусора.

Страницы TxIdStats также используются для постепенного уменьшения таблицы коммитов. После того, как транзакция оказывается закоммичена, запись об этом хранится в хеш-таблице в памяти до тех пор, пока на соответствующий TxId есть хотя бы одна ссылка. По мере компакшена SST незакомиченные дельты перезаписываются в виде закомиченных записей и ссылки на страницах TxIdStats пропадают. Если для соответствующего TxId был сделан коммит или отмена, плюс на него не остаётся ссылок и в MemTable и в SST, то эта запись удаляется из таблицы коммитов и освобождает память.

Размер таблицы коммитов в памяти ограничивается на уровне даташардов количеством открытых транзакций в каждый момент времени. Открытые транзакции — те, которые ещё не закоммичены или не отменены. Во время компакшена в SST могут быть записаны дельты только для открытых на момент начала компакшена транзакций, так как закомиченные дельты переписываются в обычные закомиченные строки, а от отменённых транзакций не остаётся каких-либо следов. Соответственно, размер таблицы коммитов ограничен количеством открытых транзакций помноженным на количество SST.

Одалживание SST с незакомиченными изменениями

При копировании таблиц, а также при сплитах и мёржах DataShard используется механизм «одалживания» SST в LocalDB. При этом SST из шарда-источника мёржится в соответствующую таблицу на шарде-приёмнике. С добавлением незакомиченных изменений там могут быть как изменения из ещё открытых транзакций (актуально в случае репликации), так и закомиченные, но ещё не скомпакченные изменения. Для того, чтобы на шардах-приёмниках данные читались в том же виде, как и на шарде-источнике, одалживаются и мёржатся в том числе TxStatus блобы. Информациюя из них добавляется в таблицу коммитов.

Информация по транзакциям может отличаться на разных шардах. Рассмотрим новый пример:

  1. Транзакция пишет изменение по ключу K с TxId на шард S, при этом данные компактятся и изменения оказывается в крупной SST.
  2. Шард S начинает превышать порог по размеру и сплитится на два шарда L и R.
  3. SST, в котором есть изменения по ключу K, попадают на оба шарда с наложением фильтра.
  4. Ключ K оказывается в шарде L, но транзакция также существует на шарде R, так как на неё есть ссылка из одолженной SST.
  5. Транзакция коммитит изменение на шарде L, но при этом коммита на шарде R не происходит, така как логически и с точки зрения писателя на шарде R не было изменений.
  6. Так как транзакция завершается, TxId на шарде R через какое-то время откатывается, так какв реальности в данных шарда при этом ничего не меняется.
  7. Допустим, в дальнейшем шарды L и R снова смержились.
  8. В такой ситуации в TxStatus от шарда L будет коммит, а в TxStatus от шарда R будет отмена для того же TxId. Так как транзакции успешно коммитят все изменения, а отмена может быть в том числе из-за наложенного фильтра, в случае мержа конфликтующей информации по TxId коммит «выигрывает» над отменой.

На практике DataShard перед очередным сплитом или мержем полностью компактятся, так что конфликтующая информация по статусу коммита не должна возникнуть. Например, это означает, что транзакции не могут решить закоммитить только часть изменений, все изменения для конкретного TxId должны быть закоммичены вместе. Это также означает, что идентификаторы транзакций нельзя переиспользовать, в том числе между разными шардами. В качестве TxId для незакомиченных изменений также могут использоваться только глобально уникальные идентификаторы.