DataShard: distributed transactions

YDB implements distributed transactions, which are based on ideas from the Calvin: Fast Distributed Transactions for Partitioned Database Systems paper. These transactions consist of a set of operations performed by a group of participants, such as DataShards. Unlike Calvin, these operations are not required to be deterministic. To execute a distributed transaction, a proposer prepares the transaction at each participant, assigns a position (or a timestamp) to the transaction in the global transaction execution order using one of the coordinator tablets, and collects the transaction results. Each participant receives and processes a subset of transactions it is involved in, following a specific order. Participants may process their part of the larger transaction at different speeds rather than simultaneously. Distributed transactions share the same timestamp across all participating shards and must include all changes from transactions with preceding timestamps. When viewed as a logical sequence, timestamps act as a single logical timeline where any distributed transaction fully happens at a single point in time.

When the execution of a transaction depends on the state of other participants, the participants exchange data using so-called ReadSets. These are persistent messages exchanged between participants that are delivered at least once and contain read results with the state of the transaction. The use of ReadSets causes transactions to go through additional phases:

  1. Reading phase: The participant reads, persists, and sends data that is needed by other participants. During this phase, KQP transactions (type TX_KIND_DATA, which have a non-empty TDataTransaction.KqpTransaction field and subtype KQP_TX_TYPE_DATA) validate optimistic locks. Older MiniKQL transactions (type TX_KIND_DATA, which have a non-empty TDataTransaction.MiniKQL field) perform reads and send arbitrary table data during this phase. Another example of using the reading phase is the distributed TTL transaction for deleting expired rows. The primary shard generates a bitmask matching expired rows, ensuring that both the primary and index shards delete the same rows.
  2. Waiting phase: The participant waits until it has received all the necessary data from the other participants.
  3. Execution phase: The participant uses both local and remote data to determine whether to abort or complete the transaction. The participant generates and applies the effects specified in the transaction body if the transaction is completed successfully. The transaction body typically includes a program that uses the same input data and leads all participants to come to the same conclusion.

Participants are allowed to execute transactions in any order for efficiency. However, it's crucial that other transactions can't observe this order. Transaction ordering based on a coordinator's assigned timestamps ensures strict serializable isolation. In practice, single-shard transactions don't involve a coordinator, and shards use a locally consistent timestamp for such transactions. Variations in the arrival times of distributed transaction timestamps weaken the isolation level to serializable.

Additionally, YDB has support for "volatile" distributed transactions. These transactions allow participants, including coordinators, to store transaction data in volatile memory, which is lost when the shards are restarted until the transaction is completed and the effects are persisted. This also allows participants to abort the transaction until the very last moment, which will be guaranteed to abort for all other participants. Using volatile memory removes persistent storage from the critical path before the transaction execution, reducing latency.

When executing the user's YQL transactions, YDB currently uses distributed transactions only for the final commit of non-read-only transactions. Queries before the commit of a YQL transaction are executed as single-shard operations, using optimistic locks and global multi-version concurrency control (MVCC) snapshots to ensure data consistency.

Basic distributed transactions protocol

Operations that can be performed as distributed transactions in YDB include various types of participants. The basic protocol for distributed transactions is the same regardless of the type of transaction, with some notable differences in the schema changes, which have additional requirements to ensure these transactions are idempotent.

Distributed transactions are managed using proposer actors. Some examples of these are:

  • TKqpDataExecutor executes DML queries, including distributed commits.
  • SchemeShard executes distributed transactions for schema changes.
  • TDistEraser executes a distributed transaction to consistently erase rows in tables with secondary indexes that match time-to-live (TTL) rules.

Distributed transactions in YDB are similar to two-phase commit protocols. The proposer actor goes through the following phases when executing a distributed transaction:

  1. Determining participants: The proposer actor selects specific shards (TabletId) that are required for transaction execution. A table may consist of many shards (DataShard tablets with unique TabletId identifiers), but a particular transaction may only affect a smaller set of these shards based on the affected primary keys. This subset is fixed at the start of the transaction and cannot be changed later. Transactions that only affect a single shard are called "single-shard" transactions and are processed in what is known as the "immediate execution" mode.
  2. Prepare phase: The proposer sends a special event, usually called TEvProposeTransaction (there is also the TEvWrite variant in DataShards), which specifies a TxId, a transaction identifier, unique within a particular cluster, and includes the transaction body (operations and parameters). Participants validate whether the specified transaction can be executed, select a range of allowed timestamps, MinStep and MaxStep, and reply with a PREPARED status on success.
    • For single-shard transactions, the proposer typically specifies an "immediate execution" mode (Immediate). The shard executes such transactions as soon as possible (at an unspecified timestamp consistent with other transactions) and replies with the result rather than PREPARED, which causes the planning phase to be skipped. Some special single-shard operations, such as TEvUploadRowsRequest, which implements BulkUpsert, don't even have a globally unique TxId.
    • The persistent transaction body is stored in the shard's local database, and the participant must ensure that it is executed when planned. In certain cases (for example, when performing a blind UPSERT into multiple shards), the participants must also ensure that the transaction is executed successfully, which may be in conflict with specific schema changes.
    • The volatile transaction body is stored in memory, and the participant responds with PREPARED as soon as possible. Future execution, whether successful or not, is not guaranteed in any way.
    • The proposer moves on to the next phase when they have received responses from all the participants.
    • It's not safe to re-send the propose event, except for schema operations, which, thanks to special idempotency fields, guarantee that a particular transaction will be executed exactly once.
  3. Planning phase: When the proposer has received PREPARED replies from all participants, it calculates the aggregated MinStep and MaxStep values and selects a coordinator to assign the timestamp to the transaction. A TEvTxProxy::TEvProposeTransaction event is sent to the selected coordinator, which includes the TxId and a list of participants.
    • The transaction may only involve shards from the same database. Each shard attaches its ProcessingParams to the reply, which has the same list of coordinators when shards belong to the same database.
    • The coordinator is selected based on the received ProcessingParams because, historically, queries could be executed without specifying a database. The list of coordinators can only be determined from the participants.
    • When the TEvTxProxy::TEvProposeTransaction event is re-sent (currently only for schema transactions), it is possible that the transaction may have multiple timestamps associated with it. This is not typically a problem as the transaction will execute at the earliest possible timestamp, and any later timestamps will be ignored (the transaction will have been completed and removed by the time they occur).
  4. Execution phase: The proposer waits for responses from the selected coordinator and participants, collecting the overall transaction outcome.
    • In some cases, such as a temporary network disconnection or a shard restart, the proposer may try to re-establish the connection and wait for the result. This process may continue until the transaction has been completed and the result is available.
    • When it is impossible to retrieve the result of a transaction from at least one of participant due to network issues, the transaction usually fails with an UNDETERMINED status, indicating that it is impossible to determine whether the transaction was successful.

Prepare phase in the DataShard tablet

Distributed transactions in the DataShard tablet begin with the Prepare phase, which can be proposed by one of the following events:

Events that don't have an Immediate execution mode specified will begin the Prepare phase for the distributed transaction. The transaction body will be validated to determine whether it's even possible to execute it (for example, by using the CheckDataTxUnit unit for generic data transactions). A range of timestamps will then be selected:

  • MinStep is selected based on the current mediator time or the wall clock.
  • MaxStep is determined by a planning timeout, which at the moment is 30 seconds for data transactions.

Then, the transaction is written to disk (for persistent transactions) or kept in memory (for volatile transactions), the shard replies with a PREPARED status and starts waiting for a plan that specifies the PlanStep for the given TxId. The planning deadline is important because if the proposer fails unexpectedly, the shard cannot determine whether the proposer has successfully planned the transaction for a future timestamp, so the shard must ensure that the transaction will be executed when planned (unless it's volatile). Since transactions that are not yet planned block some concurrent operations (such as schema and partitioning changes), a deadline is used to make it impossible to plan a transaction after a certain time. When the mediator time exceeds MaxStep and there is no corresponding plan for the transaction, then the protocol guarantees that it will not be possible to plan the transaction anymore. The transaction that reaches the deadline can then be safely removed.

Transactions are stored on disk and in memory using the TTransQueue class. Basic information about persistent transactions is stored in the TxMain table, which is loaded into memory when DataShard starts. The potentially large transaction body is stored in the TxDetails table and is not kept in memory while waiting. The transaction body is loaded into memory just before conflict analysis with other transactions in the pipeline.

Volatile transactions are stored in memory and are currently lost when DataShard restarts (they may be migrated during graceful restarts in the future). The restart aborts the transaction for all participants, and any participant can initiate the abort before the transaction body is executed and its effects are persisted. Shards use this feature to make schema and partitioning changes faster by aborting all pending transactions without waiting for a planning deadline.

The distributed transaction body needs to have enough information about the other participants so that each shard can know when it needs to generate and send outgoing ReadSets and which shards should expect and wait for incoming ReadSets. KQP transactions currently use ReadSets for validating and committing optimistic locks, which are described using TKqpLocks generated by the TKqpDataExecutor actor. This message describes the following shard sets:

  • SendingShards are shards that send ReadSets to all shards in the ReceivingShards set.
  • ReceivingShards are shards that expect ReadSets from all shards in the SendingShards set.

Volatile transactions expect all shards to be in the SendingShards set because any shard may abort the transaction and need to send its commit decision to other shards. They also expect all shards that apply changes to be in the ReceivingShards set. Whether or not changes are committed depends on the decisions of other shards. Exchanged ReadSet data is serialized into TReadSetData message with a single Decision field specified.

An example of a distributed transaction that doesn't use ReadSets is a persistent distributed transaction with blind writes. In this case, after successful planning, the transaction cannot be aborted, and the shards must ensure the future success of the transaction during the Prepare phase.

Planning phase

When all participants have provided their PREPARED responses, the proposer calculates the maximum MinStep and the minimum MaxStep, then selects a coordinator (which is currently implemented using a TxId hash) and sends a TEvTxProxy::TEvProposeTransaction event, which includes the TxId and a list of participants with the operation type (read or write) for each participant (even though this information is not currently used). The selected coordinator then selects the closest matching PlanStep and associates it with the specified TxId and the list of participants. Plan steps for persistent transactions are allocated every 10 milliseconds (this setting is called plan resolution), and the association is also stored on disk. Plan steps for volatile transactions are selected from those reserved for volatile planning and can be as frequent as every millisecond, and the association is only stored in memory.

Planning each plan step (which can contain zero or more transactions) involves distributing participants (and their transactions) to mediators. This is currently done by hashing the TabletId of each participant modulo the number of mediators. Each mediator receives a subset of the plan step in increasing the PlanStep order. The subset only includes matching participants and may be empty, even if the full plan step is not. If a mediator restarts or the network becomes temporarily disconnected, the coordinator reconnects and sends all unacknowledged plan steps again in order.

Plan steps with persistent transactions are only sent to mediators after being fully persisted to disk. They are only removed from the coordinator's local database when acknowledged by participants and are guaranteed to be delivered at least once. Plan steps with volatile transactions, on the other hand, are only stored in memory and may be lost if the coordinator restarts. When a plan step is resent, it may or may not include acknowledged transactions or previously sent volatile transactions that still need to be acknowledged. This includes empty plan steps. Only the latest empty plan step will be kept in memory for re-sending.

To reduce the number of errors during graceful restarts, the coordinator leaves its state actor in memory even after the tablet stops working. The address of this state actor is persisted after the instance has been fully initialized and before it is ready to accept new requests. New instances contact this state actor and transfer the last known in-memory state, including the list of planned volatile transactions. This state actor is also used to transfer any unused volatile planning reserves, allowing new instances to start faster without having to wait until those reserves expire.

Mediators receive a stream of TEvTxCoordinator::TEvCoordinatorStep events from each coordinator and merge them using the matching PlanStep field. Merged plan steps with steps less or equal to the minimum of the last step received from each coordinator are considered complete and are sent to participants using TEvTxProcessing::TEvPlanStep events. Each participant receives an event with the PlanStep, specifying the timestamp, and a list of TxId that must be executed at that timestamp. Transactions within each plan step are ordered based on their TxId. The (Step, TxId) pairs are then used as the global MVCC version in the database.

Participants acknowledge that they have received (and persisted in the case of a persistent transaction) each plan step by sending a TEvTxProcessing::TEvPlanStepAccepted event to the sender (which is a mediator tablet) and a TEvTxProcessing::TEvPlanStepAck event to the specified coordinator tablet actor (as specified in the AckTo field of each transaction). Plan steps and/or transactions will be considered delivered when these events have been processed and will not be resent.

Based on TEvTxProcessing::TEvPlanStepAccepted events, mediators also track which PlanStep has been delivered to all participants, inclusive. This maximum PlanStep is known as the current mediator time and is distributed to nodes with running DataShards through subscriptions to the TimeCast service. The current mediator time indicates that all possible TEvTxProcessing::TEvPlanStep events have been received and acknowledged by the shards up to and including the specified PlanStep. Therefore, shards should be aware of all transactions up to this timestamp. The current mediator time is helpful as it allows the shards to keep track of the time progressing even when they are not participating in distributed transactions. For efficiency, all shards are partitioned into several timecast buckets at each mediator. The current time in each bucket advances when all participants in transactions in that bucket acknowledge their TEvTxProcessing::TEvPlanStep events. The current mediator time is available to shards when they subscribe by sending TEvRegisterTablet event to the time cast service during shard startup and get the address of an atomic variable from the TEvRegisterTabletResult event. This atomic variable allows the system to avoid broadcasting many frequent events to idle shards.

DataShards handle TEvTxProcessing::TEvPlanStep events in the TTxPlanStep transaction. Transactions are found by their corresponding TxId, get their Step assigned, and then are added to the Pipeline. The Pipeline uses the PlanQueue to limit the number of concurrently running transactions and executes them in the (Step, TxId) order.

Execution phase in the DataShard tablet

The PlanQueue unit allows distributed transactions to start, subject to concurrency limits, in the increasing (Step, TxId) order. The transaction body is loaded from disk when needed (for evicted persistent transactions), KQP transactions finalize execution plans and arrive at the BuildAndWaitDependencies unit. This unit analyzes transaction keys and ranges declared for reading and writing and may add dependencies on earlier conflicting transactions. For example, when transaction A writes to key K and a later transaction B reads from key K, then transaction B depends on transaction A, and transaction B cannot start until transaction A completes. Transactions leave BuildAndWaitDependencies when they no longer have direct dependencies on other transactions.

Next, persistent KQP transactions execute the read phase (which includes validating optimistic locks) and generate outgoing OutReadSets in the BuildKqpDataTxOutRS unit. Then, the StoreAndSendOutRS persists outgoing ReadSets and access logs for optimistic locks. Optimistic locks that have attached uncommitted changes are marked with the Frozen flag, which prevents them from being aborted until the transaction completes. Otherwise, lock validity is ensured by assigning writes with a higher MVCC version and ensuring the correct execution order of conflicting transactions. Operations with access logs or outgoing ReadSets are added to the Incomplete set, which ensures that new writes can't change the validity of previous reads and generally need to use a higher MVCC version. However, new reads don't necessarily need to block on the outcome of the incomplete transaction and can use a lower MVCC version as long as it's consistent with other transactions.

Persistent KQP transactions prepare data structures for incoming InReadSets in the PrepareKqpDataTxInRS unit and begin waiting for all necessary ReadSets from other participants in the LoadAndWaitInRS unit. In some cases, such as blind writes to multiple shards without lock validation, distributed transactions may not require the exchange of ReadSets, and the ReadSet-related units don't perform any actions in these scenarios.

Finally, the KQP transaction operation reaches the ExecuteKqpDataTx unit. This unit validates local optimistic locks using previously persisted AccessLog data when available, validates ReadSets received from other participants, and, if everything checks out, executes the transaction body and returns a result. If lock validation fails locally or remotely, the transaction body is not executed, and the operation fails with an ABORTED status.

Volatile transactions

Volatile distributed transactions are stored only in memory and are lost when the shard is restarted. Nevertheless, they must guarantee that the distributed transaction either commits at all participants or is aborted promptly. The advantage of volatile transactions is that they don't require separate read, wait, and execution phases. Instead, they are executed atomically in a single phase with 1RTT storage latency from the start of the distributed commit to the successful reply. This means they usually don't slow down the pipeline and can increase transaction throughput. Since any shard can abort a volatile transaction without waiting for a planning deadline, it also limits the unavailability of the shard during partitioning and schema changes.

Volatile transactions are based on persistent uncommitted changes in the local database. During the execution phase, DataShard optimistically assumes that all remote locks will be validated, applies effects in the form of uncommitted changes (using the globally unique TxId of the distributed transaction), and adds the transaction record to the VolatileTxManager in a Waiting state for other participants to make decisions. A successful reply is only sent when all transaction effects are persistent, and a committed decision has been received from all other participants. Reads use the TxMap to observe all pending changes and check their status using the TxObserver. When a read result depends on the outcome of a volatile transaction, operations subscribe to its status and restart after it has been decided (either by reading committed changes or skipping them if the transaction aborts).

Having uncommitted and not yet aborted changes limits the shard's ability to perform blind writes. Because uncommitted changes must be committed in the same order in which they were applied to any given key, and shards don't keep these keys in memory after transactions have been executed, DataShard needs to read each key and detect conflicts before it can perform any write. These conflicting changes may be uncommitted changes associated with an optimistic lock (and these locks must be broken along with rolling back the changes) or uncommitted changes from waiting for volatile transactions. We don't want to block writes unnecessarily, so even non-volatile operations can switch to a volatile commit. Such operations allocate a GlobalTxId when necessary (this is a per-cluster unique TxId that is needed when the request doesn't provide one, such as in BulkUpsert) and write changes to conflicting keys as uncommitted. The transaction record is then added to the VolatileTxManager without specifying other participants and becomes initially committed. It also specifies a list of dependencies, which are transactions that must be completed before the transaction can be committed. These transactions don't block reads and are eventually committed in the local database after all their dependencies have been completed or aborted.

To reduce stalls in the pipeline, transactions use the conflict cache for keys that are declared for writes in distributed transactions. These keys are read while transactions are waiting in the queue, and conflicting transaction sets are cached by the RegisterDistributedWrites function call. All writes to cached keys update these conflict sets and keep them up-to-date. This allows distributed transactions with writes to execute faster by processing lists of conflicting transactions using a hash table lookup even when the table data is evicted from memory.

DataShard may have change collectors, such as async indexes and/or Change Data Capture (CDC). Collecting these changes for volatile transactions is similar to uncommitted changes in transactions, generating a stream of uncommitted change records using the TxId as its LockId. These records are then either added atomically to shard change records upon commit or deleted upon abort. Depending on the settings of the change collector, it may also need to read the current row state. Suppose this row state depends on other volatile transactions that are waiting. In that case, it is handled similarly to any other read by adding a dependency and restarting when these dependencies are resolved. This can cause the transaction pipeline to stall, but it's conceptually similar to persistent distributed transactions with ReadSets that read-write conflicts can also stall.

Volatile transactions also generate and write to disk OutReadSets for all other participants specified in the ReceivingShards sets in the same local database transaction, which writes uncommitted effects. When these uncommitted changes become persistent, these ReadSets are sent to other participants, notifying them that this shard is ready to commit the distributed transaction and will not change its decision until the transaction is aborted by another participant.

Shards forget about volatile transactions that do not persist in their transaction records when they are restarted and will not send outgoing ReadSets for transactions they no longer know anything about. Shards that successfully execute the transaction body send a special event called ReadSet Expectation to inform about them waiting for an incoming ReadSet. This is done even before the effects are persisted, so the shards have a chance to find out about aborted transactions as early as possible. Restarted shards will receive a request for a transaction they don't know about and respond with a special ReadSet without data. In this way, all participants will find out about the aborted transaction and abort it as well.

Volatile transaction guarantees

To reiterate, volatile transactions are stored in memory by coordinators and participants and may fail for various reasons. It's important to understand why a volatile transaction either commits or aborts for all participants and why it's not possible for a transaction to commit only for a subset of participants.

Some examples of potential sources of error for the transaction:

  1. Any shard can unexpectedly restart, including when a new instance starts while the old one is still running and is unaware of the new one. Since transactions are only stored in memory, the new instance may not be aware of transactions that failed to save their effects to disk. A transaction may also have been successfully executed and committed without leaving any traces except for its committed effects.
  2. The coordinator may unexpectedly restart without transferring its in-memory state, and the transaction may have been sent to some participants but not all of them.
  3. Any shard might have decided to abort the transaction due to an error.

A vital guarantee is that if any shard successfully persists, uncommitted changes, and a transaction record, and if it receives a DECISION_COMMIT from all other participants, the transaction will be eventually committed at all participants and can't be rolled back. It's also important to note that if any shard aborts a transaction due to an error (such as forgetting about the transaction after restarting), the transaction will never be able to complete and will eventually roll back.

These guarantees are based on the following:

  1. ReadSets with DECISION_COMMIT are sent to other participants only after all uncommitted changes, outgoing ReadSets, and a transaction record have been persisted to disk. Therefore, DECISION_COMMIT messages are persistent and will be delivered to other participants until acknowledged. Even if another shard aborts the transaction, these messages will continue to be delivered.
  2. ReadSets are only acknowledged either when a shard successfully writes to disk or when a read-only lease is active and an unexpected ReadSet is received. Specifically, an older tablet instance will not acknowledge a ReadSet for a transaction already prepared and executed by a newer tablet. The local database ensures that a new tablet will not be activated until the read-only lease on older tablets has expired (provided that the monotonic clock frequency is not more than twice as fast). A successful write confirms that the tablet held the storage lock at the beginning of the commit, and no newer tablets were running simultaneously.
    • Volatile transactions use an optimization where received ReadSets are not written to disk. Acknowledgment is sent only after a commit or an abort (removal) of the transaction record has been fully persisted on disk.
    • When all participants have generated and persisted in their outgoing DECISION_COMMIT ReadSets, they will continue to receive the complete set of commit decisions until they have fully committed the transaction effects and sent their acknowledgments.
  3. Any message related to an aborted transaction is only sent after the abort has been written to disk. This is necessary to handle situations where multiple generations of a particular shard are running at the same time. After a newer generation has successfully committed a transaction and acknowledged ReadSets, an older generation (which would fail when trying to access the disk or validate a read-only lease) may receive a NO_DATA reply to its ReadSet Expectation (which was acknowledged by the newer generation). However, the older generation will not be able to commit the removal of the transaction record and cannot reply with an error incorrectly. When the removal of the transaction record (and the transaction effects) have been fully committed, the current generation can be assured that newer generations won't start with the transaction again, and it won't commit.
  4. After collecting all DECISION_COMMIT ReadSets, a successful reply does not need to wait for the final commit in the local database. Instead, it has to wait until the uncommitted effects and the transaction record have been persisted. This allows the successful reply to have a 1RTT storage latency on the critical path caused by writing uncommitted changes and the transaction record. This successful response is stable:
    • If a shard is restarted, its effects and transaction record will be persistent. In the worst case, it will be recovered in the Waiting state.
    • From the MVCC perspective, the transaction has already been completed, so any new reads will have an MVCC version that includes these changes. Any new read will begin waiting for the transaction to resolve, which is now guaranteed to succeed.
    • The restored Waiting transaction could not have its incoming ReadSets acknowledged, so they will be resent. Since we received DECISION_COMMIT earlier, they will be received again, and the transaction will quickly move to the Committed state.
  5. As long as the transaction is in the queue, and especially while its PlanStep is not yet known, incoming ReadSets are stored in memory and will not be acknowledged. Acknowledgments are delayed until the transaction has been fully committed or is aborted prematurely (in which case, no changes have been made, and no transaction record has been committed, so newer generations cannot recover or commit the transaction).

Volatile distributed transactions also ensure that changes are visible and stable across multiple shards participating in the same distributed transaction. For instance, persistent distributed transactions allow the following anomaly:

  1. Distributed transaction Tx1 performs a blind write to keys x and y in two different shards.
  2. Single-shard transaction Tx2 reads key x and observes a value written by Tx1.
  3. Single-shard transaction Tx3 (which starts after Tx2 has completed) reads key y and does not observe a value written by Tx1.

This anomaly can occur when a shard with key x quickly receives a transaction plan step with Tx1 and performs a write. A subsequent read in Tx2, which arrives a bit later, will have its local MVCC version include changes to key x and observe the change. However, the shard with key y may be running on a slower node and may not be aware of the Tx1 plan step when a subsequent read in Tx3 arrives. Since the time cast bucket doesn't have Tx1 acknowledged, its mediator time will remain in the past, and Tx3 will have a local MVCC version that doesn't include changes to key y. From the user's perspective, Tx2 must happen before Tx3, but the global serializable order turns out to be Tx3, Tx1, and Tx2.

Volatile transactions cannot observe this particular anomaly because a read that observes changes to key x must have received DECISION_COMMIT from the other shard with key y. This means that the transaction record has been persisted in both shards and, crucially, the shard with key y has marked it as completed. A read from key y will choose a local MVCC version that includes changes made by Tx1 and will need to wait until Tx1 is resolved.

In other words, if a client observes the result of a volatile transaction at a certain point in time, all subsequent reads will also observe the result of that volatile transaction, and changes stability is not violated.

Indirect planning of volatile transactions

The coordinator restart may cause the plan step to only reach a subset of the participants in a volatile distributed transaction. As a result, some shards may execute the transaction and begin waiting for ReadSets, while other shards continue waiting for the plan step to arrive. Due to the planning timeout of 30 seconds, some transactions could experience excessive delays before eventually aborting.

When any participant receives the first plan step of a volatile transaction, they will include the PlanStep in the ReadSets they send to other participants. The other participants then indirectly learn about the PlanStep assigned to the transaction and remember it as the PredictedPlanStep. Even if their own plan step is lost, DataShard will add the transaction to the PlanQueue when its mediator time (directly or indirectly) reaches the PredictedPlanStep, as if they had received a plan step with that predicted PlanStep and TxId. If the PredictedPlanStep is in the past, the transaction will be quickly aborted, as if it had reached the planning deadline.

Since the same transaction can be assigned to multiple plan steps, with some getting lost, different participants may have a distinct PlanStep assigned to the same transaction. The same transaction will then try to execute at different timestamps. DataShards verify that DECISION_COMMIT messages have their PlanStep matching their assigned PlanStep and only process them if the steps match. The transaction will be aborted if there is a mismatch in the plan steps.