Lesson 9
Transactions & ACID
Isolation models and optimistic paths.
Course Navigation
Back to courseLearning Objectives
-
Master ACID properties and their implementation
-
Understand isolation levels and their trade-offs
-
Implement Multi-Version Concurrency Control (MVCC)
-
Build snapshot isolation for repeatable reads
-
Detect and prevent transaction conflicts
-
Implement deadlock detection and prevention
Lesson 9.1: Transaction Fundamentals
ACID Properties in Key-Value Stores
ACID guarantees are fundamental to databases. Let’s understand each property and how to implement them in our key-value store.
Atomicity
All or nothing - transaction either fully completes or fully rolls back
Transfer $100 from Account A to Account B:
Step 1: Deduct $100 from A
Step 2: Add $100 to B
Both happen or neither happens.
If crash between steps: Rollback both.
Consistency
Data remains in valid state
Constraint: Balance >= 0
Before: A=$50, B=$50
Transaction: Transfer $100 (would violate constraint)
Result: Transaction rejected (consistency preserved)
Isolation
Concurrent transactions don’t interfere
Transaction 1: Read Balance A
Transaction 2: Modify Balance A
Without isolation:
T1 reads $100
T2 modifies to $50
T1 uses stale $100 (inconsistency!)
With isolation:
T1 reads $100
T2 waits or sees snapshot
Consistent view
Durability
Committed data survives crashes
Transaction: Transfer $100, committed
Power failure 1 second later
System restarts
Data still shows transfer: Durable!
Isolation Levels
Different levels provide different guarantees vs performance trade-offs:
Read Uncommitted (Dirty Reads Allowed)
Transaction A: Read X=100
Transaction B: Modify X to 50 (not committed)
Transaction A: Sees X=50 (DIRTY READ!)
Transaction B: Rollback
Transaction A: Used stale data (consistency violation!)
Isolation Level
Lowest
Consistency
Lowest
Performance
Highest
Read Committed (No Dirty Reads)
Transaction A: Read X=100
Transaction B: Modify X to 50, COMMIT
Transaction A: Read X again = 50 (NON-REPEATABLE READ)
Same transaction sees different values!
Isolation Level
Medium
Consistency
Medium
Performance
Good
Repeatable Read (No Non-Repeatable Reads)
Transaction A: Read X=100
Transaction B: Modify X to 50, COMMIT
Transaction A: Read X again = 100 (REPEATABLE!)
But phantom records possible:
Transaction A: Range query (0-100)
Transaction B: Insert record at 75, COMMIT
Transaction A: Range query again = includes new record
Isolation Level
High
Consistency
High
Performance
Moderate
Serializable (Highest Isolation)
Transactions execute as if serially, one after another.
Transaction A: Read/Write X, Y
Transaction B: Read/Write X, Y
One fully completes before other starts (or reorder to avoid conflict).
SLOWEST but SAFEST.
Isolation Level
Highest
Consistency
Highest
Performance
Lowest
Multi-Version Concurrency Control (MVCC)
Problem: Locking reduces concurrency
Write lock held on row during transaction:
T1: Lock row → Read → Process → Update (long operation)
T2: Wants to read same row → BLOCKED for entire duration!
Solution: MVCC - multiple versions allow readers and writers to coexist
Instead of locking, maintain multiple versions:
Row "user:1" versions:
v1 (tx_id=100): {"name": "alice", "age": 30}
v2 (tx_id=105): {"name": "alice", "age": 31}
v3 (tx_id=110): {"name": "alice", "age": 32}
Transaction A (start at v2):
Reads: Always sees v2 (consistent snapshot)
Transaction B (start at v3):
Modifies to v4: {"name": "alice", "age": 33}
Both proceed concurrently!
Version Chain
Every row has version chain:
Key: user:1
v1 → v2 → v3 → v4 → ...
Each version tagged with:
- Transaction ID (who created it)
- Commit timestamp
- Visible to which transactions
Readers: Follow chain to find visible version
Writers: Append new version
Snapshot Isolation
Snapshot Isolation: Each transaction operates on consistent snapshot
Global state at time T0:
user:1 = alice
user:2 = bob
user:3 = charlie
txn_id = 110
Transaction starts at T0:
Snapshot: {user:1=alice, user:2=bob, user:3=charlie}
All reads return from this snapshot
Modifications create new version
At T1, global state changes (other transactions):
user:1 = alice_new
user:2 = bob_new
txn_id = 115
Transaction still reads snapshot values:
Repeatable reads guaranteed!
But new transactions see new values
Lesson 9.2: Implementation - Core Transaction Engine
Transaction Structure
package transaction
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// TransactionID uniquely identifies transaction
type TransactionID uint64
// VersionNumber tracks versions for MVCC
type VersionNumber uint64
// Transaction represents active transaction
type Transaction struct {
ID TransactionID
StartTime time.Time
StartVersion VersionNumber
Status TxnStatus
ReadSet map[string]VersionNumber // key → version read
WriteSet map[string][]byte // key → new value
DeleteSet map[string]bool // keys marked for deletion
mu sync.RWMutex
}
type TxnStatus int
const (
TxnPending TxnStatus = iota
TxnCommitted
TxnRolledback
TxnAborted // Due to conflict
)
func (t *Transaction) String() string {
return fmt.Sprintf("Txn[id=%d, status=%d, startVer=%d]", t.ID, t.Status, t.StartVersion)
}
// VersionedValue stores multiple versions
type VersionedValue struct {
Version VersionNumber
Value []byte
CreatedBy TransactionID
CreatedAt time.Time
IsDeleted bool
}
// VersionChain manages all versions of a key
type VersionChain struct {
Key string
Versions []*VersionedValue
mu sync.RWMutex
}
// GetVisibleVersion returns version visible to transaction
func (vc *VersionChain) GetVisibleVersion(txn *Transaction) (*VersionedValue, error) {
vc.mu.RLock()
defer vc.mu.RUnlock()
// Walk backwards through versions (newest first)
for i := len(vc.Versions) - 1; i >= 0; i-- {
v := vc.Versions[i]
// Version must be created before transaction started
if v.Version <= txn.StartVersion {
if !v.IsDeleted {
return v, nil
}
// Key was deleted
return nil, fmt.Errorf("key deleted at version %d", v.Version)
}
}
return nil, fmt.Errorf("key not found")
}
// AppendVersion adds new version
func (vc *VersionChain) AppendVersion(v *VersionedValue) {
vc.mu.Lock()
defer vc.mu.Unlock()
vc.Versions = append(vc.Versions, v)
}
// CleanupOldVersions removes versions no longer needed
func (vc *VersionChain) CleanupOldVersions(oldestActiveVersion VersionNumber) int {
vc.mu.Lock()
defer vc.mu.Unlock()
keepIdx := 0
for i, v := range vc.Versions {
if v.Version >= oldestActiveVersion {
vc.Versions[keepIdx] = vc.Versions[i]
keepIdx++
}
}
removed := len(vc.Versions) - keepIdx
vc.Versions = vc.Versions[:keepIdx]
return removed
}
Transaction Manager
// TransactionManager coordinates transactions
type TransactionManager struct {
nextTxnID uint64
nextVersionNumber VersionNumber
activeTxns map[TransactionID]*Transaction
versionChains map[string]*VersionChain
mu sync.RWMutex
conflictDetector *ConflictDetector
deadlockDetector *DeadlockDetector
undoLog *UndoLog
}
func NewTransactionManager() *TransactionManager {
return &TransactionManager{
activeTxns: make(map[TransactionID]*Transaction),
versionChains: make(map[string]*VersionChain),
conflictDetector: NewConflictDetector(),
deadlockDetector: NewDeadlockDetector(),
undoLog: NewUndoLog(),
}
}
// BeginTxn starts new transaction
func (tm *TransactionManager) BeginTxn(ctx context.Context) (*Transaction, error) {
tm.mu.Lock()
defer tm.mu.Unlock()
txnID := TransactionID(atomic.AddUint64(&tm.nextTxnID, 1))
txn := &Transaction{
ID: txnID,
StartTime: time.Now(),
StartVersion: VersionNumber(atomic.LoadUint64((*uint64)(&tm.nextVersionNumber))),
Status: TxnPending,
ReadSet: make(map[string]VersionNumber),
WriteSet: make(map[string][]byte),
DeleteSet: make(map[string]bool),
}
tm.activeTxns[txnID] = txn
return txn, nil
}
// Get retrieves value within transaction
func (tm *TransactionManager) Get(txn *Transaction, key string) ([]byte, error) {
if txn.Status != TxnPending {
return nil, fmt.Errorf("transaction not pending")
}
txn.mu.Lock()
defer txn.mu.Unlock()
// Check write buffer first (read-your-write)
if val, ok := txn.WriteSet[key]; ok {
return val, nil
}
// Check delete buffer
if txn.DeleteSet[key] {
return nil, fmt.Errorf("key marked for deletion")
}
// Get version chain
tm.mu.RLock()
chain, exists := tm.versionChains[key]
tm.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("key not found")
}
// Get visible version
version, err := chain.GetVisibleVersion(txn)
if err != nil {
return nil, err
}
// Record read
txn.ReadSet[key] = version.Version
return version.Value, nil
}
// Put writes value (buffered)
func (tm *TransactionManager) Put(txn *Transaction, key string, value []byte) error {
if txn.Status != TxnPending {
return fmt.Errorf("transaction not pending")
}
txn.mu.Lock()
defer txn.mu.Unlock()
// Remove from delete set if present
delete(txn.DeleteSet, key)
// Add to write set
txn.WriteSet[key] = value
// Log to undo log for potential rollback
tm.undoLog.LogWrite(txn.ID, key, value)
return nil
}
// Delete marks key for deletion
func (tm *TransactionManager) Delete(txn *Transaction, key string) error {
if txn.Status != TxnPending {
return fmt.Errorf("transaction not pending")
}
txn.mu.Lock()
defer txn.mu.Unlock()
// Remove from write set if present
delete(txn.WriteSet, key)
// Add to delete set
txn.DeleteSet[key] = true
return nil
}
// CommitTxn validates and applies writes
func (tm *TransactionManager) CommitTxn(txn *Transaction) error {
if txn.Status != TxnPending {
return fmt.Errorf("transaction not pending: %v", txn.Status)
}
tm.mu.Lock()
defer tm.mu.Unlock()
// Check for conflicts
if err := tm.conflictDetector.CheckConflict(txn); err != nil {
txn.Status = TxnAborted
delete(tm.activeTxns, txn.ID)
return fmt.Errorf("conflict detected: %w", err)
}
// Check for deadlocks
if err := tm.deadlockDetector.CheckDeadlock(txn); err != nil {
txn.Status = TxnAborted
delete(tm.activeTxns, txn.ID)
return fmt.Errorf("deadlock detected: %w", err)
}
// Get new version number for all writes
newVersion := VersionNumber(atomic.AddUint64((*uint64)(&tm.nextVersionNumber), 1))
// Apply writes
for key, value := range txn.WriteSet {
tm.getOrCreateChain(key).AppendVersion(&VersionedValue{
Version: newVersion,
Value: value,
CreatedBy: txn.ID,
CreatedAt: time.Now(),
IsDeleted: false,
})
}
// Apply deletes
for key := range txn.DeleteSet {
tm.getOrCreateChain(key).AppendVersion(&VersionedValue{
Version: newVersion,
CreatedBy: txn.ID,
CreatedAt: time.Now(),
IsDeleted: true,
})
}
txn.Status = TxnCommitted
delete(tm.activeTxns, txn.ID)
return nil
}
// RollbackTxn discards transaction
func (tm *TransactionManager) RollbackTxn(txn *Transaction) error {
tm.mu.Lock()
defer tm.mu.Unlock()
txn.mu.Lock()
defer txn.mu.Unlock()
// Clear write/delete buffers
txn.WriteSet = make(map[string][]byte)
txn.DeleteSet = make(map[string]bool)
txn.ReadSet = make(map[string]VersionNumber)
txn.Status = TxnRolledback
delete(tm.activeTxns, txn.ID)
return nil
}
func (tm *TransactionManager) getOrCreateChain(key string) *VersionChain {
if chain, ok := tm.versionChains[key]; ok {
return chain
}
chain := &VersionChain{Key: key, Versions: make([]*VersionedValue, 0)}
tm.versionChains[key] = chain
return chain
}
Conflict Detection
// ConflictDetector checks for transaction conflicts
type ConflictDetector struct {
readSet map[TransactionID]map[string]bool // txn → set of keys read
writeSet map[TransactionID]map[string]bool // txn → set of keys written
mu sync.RWMutex
}
func NewConflictDetector() *ConflictDetector {
return &ConflictDetector{
readSet: make(map[TransactionID]map[string]bool),
writeSet: make(map[TransactionID]map[string]bool),
}
}
// CheckConflict detects conflicts
func (cd *ConflictDetector) CheckConflict(txn *Transaction) error {
cd.mu.RLock()
defer cd.mu.RUnlock()
txn.mu.RLock()
defer txn.mu.RUnlock()
// Check write-write conflicts
for key := range txn.WriteSet {
for otherID, otherWrites := range cd.writeSet {
if otherID != txn.ID && otherWrites[key] {
return fmt.Errorf("write-write conflict on key %s with txn %d", key, otherID)
}
}
}
// Check write-read conflicts
for key := range txn.WriteSet {
for otherID, otherReads := range cd.readSet {
if otherID != txn.ID && otherReads[key] {
return fmt.Errorf("write-read conflict on key %s with txn %d", key, otherID)
}
}
}
// Check read-write conflicts
for key := range txn.ReadSet {
for otherID, otherWrites := range cd.writeSet {
if otherID != txn.ID && otherWrites[key] {
return fmt.Errorf("read-write conflict on key %s with txn %d", key, otherID)
}
}
}
return nil
}
Deadlock Detection
// DeadlockDetector prevents circular wait
type DeadlockDetector struct {
waitGraph map[TransactionID]map[TransactionID]bool // txn A → txns it waits for
mu sync.RWMutex
}
func NewDeadlockDetector() *DeadlockDetector {
return &DeadlockDetector{
waitGraph: make(map[TransactionID]map[TransactionID]bool),
}
}
// RecordWait records that txnA waits for txnB
func (dd *DeadlockDetector) RecordWait(txnA, txnB TransactionID) error {
dd.mu.Lock()
defer dd.mu.Unlock()
if dd.waitGraph[txnA] == nil {
dd.waitGraph[txnA] = make(map[TransactionID]bool)
}
dd.waitGraph[txnA][txnB] = true
// Check for cycle
if dd.hasCycle(txnA) {
return fmt.Errorf("deadlock detected: cycle involving txn %d", txnA)
}
return nil
}
// hasCycle detects cycle using DFS
func (dd *DeadlockDetector) hasCycle(start TransactionID) bool {
visited := make(map[TransactionID]bool)
recStack := make(map[TransactionID]bool)
return dd.dfs(start, visited, recStack)
}
func (dd *DeadlockDetector) dfs(node TransactionID, visited, recStack map[TransactionID]bool) bool {
visited[node] = true
recStack[node] = true
for neighbor := range dd.waitGraph[node] {
if !visited[neighbor] {
if dd.dfs(neighbor, visited, recStack) {
return true
}
} else if recStack[neighbor] {
return true // Back edge = cycle
}
}
recStack[node] = false
return false
}
Lesson 9.3: Usage Examples and Complete Workflows
Example 1: Simple Transaction
// Transfer money between accounts
func TransferMoney(tm *TransactionManager, fromKey, toKey string, amount int) error {
// Begin transaction
txn, err := tm.BeginTxn(context.Background())
if err != nil {
return err
}
// Read balances
fromBal, _ := tm.Get(txn, fromKey)
toBal, _ := tm.Get(txn, toKey)
fromVal := parseInt(fromBal)
toVal := parseInt(toBal)
// Validate
if fromVal < amount {
tm.RollbackTxn(txn)
return fmt.Errorf("insufficient funds")
}
// Update balances
tm.Put(txn, fromKey, formatInt(fromVal-amount))
tm.Put(txn, toKey, formatInt(toVal+amount))
// Commit
if err := tm.CommitTxn(txn); err != nil {
tm.RollbackTxn(txn)
return err
}
return nil
}
Example 2: Read-Modify-Write with Retry
func IncrementCounter(tm *TransactionManager, key string, maxRetries int) error {
for attempt := 0; attempt < maxRetries; attempt++ {
txn, _ := tm.BeginTxn(context.Background())
// Read current value
val, _ := tm.Get(txn, key)
current := parseInt(val)
// Increment
tm.Put(txn, key, formatInt(current+1))
// Try to commit
err := tm.CommitTxn(txn)
if err == nil {
return nil
}
// If conflict, retry
if strings.Contains(err.Error(), "conflict") {
continue
}
return err
}
return fmt.Errorf("max retries exceeded")
}
Example 3: Multi-Step Transaction
func ProcessOrder(tm *TransactionManager, orderedKeys []string) error {
txn, _ := tm.BeginTxn(context.Background())
defer func() {
if txn.Status == TxnPending {
tm.RollbackTxn(txn)
}
}()
// Step 1: Reserve inventory
for _, key := range orderedKeys {
qty, _ := tm.Get(txn, key)
current := parseInt(qty)
if current < 1 {
return fmt.Errorf("out of stock: %s", key)
}
tm.Put(txn, key, formatInt(current-1))
}
// Step 2: Create order record
orderID := generateID()
tm.Put(txn, "order:"+orderID, []byte("pending"))
// Step 3: Commit all changes atomically
return tm.CommitTxn(txn)
}
Lab 9.1: Implement Complete MVCC Transaction Engine
Objective
Build production-ready transaction system with MVCC, conflict detection, and deadlock prevention.
Requirements
-
Transaction Support: Begin/commit/rollback operations, snapshot isolation
-
MVCC Implementation: Multiple versions per key, version chains with visibility
-
Conflict Detection: Write-write, write-read, read-write conflicts
-
Deadlock Prevention: Wait-for graph, cycle detection
-
Testing: Basic read/write transactions, snapshot isolation verification
-
Benchmarks: Transaction throughput, conflict rate impact, memory overhead
Starter Code
package transaction
import (
"context"
"fmt"
"sync"
"time"
)
// Transaction represents database transaction
type Transaction struct {
ID TransactionID
StartTime time.Time
StartVersion VersionNumber
Status TxnStatus
// TODO: Add read set, write set, delete set
}
// TransactionManager coordinates transactions
type TransactionManager struct {
// TODO: Add fields for transaction tracking, version management
}
// BeginTxn starts new transaction
// TODO: Implement with snapshot capture
func (tm *TransactionManager) BeginTxn(ctx context.Context) (*Transaction, error) {
return nil, nil
}
// Get reads value within transaction
// TODO: Implement with snapshot isolation
func (tm *TransactionManager) Get(txn *Transaction, key string) ([]byte, error) {
return nil, nil
}
// Put buffers write in transaction
// TODO: Implement write buffering
func (tm *TransactionManager) Put(txn *Transaction, key string, value []byte) error {
return nil
}
// Delete buffers delete in transaction
// TODO: Implement delete buffering
func (tm *TransactionManager) Delete(txn *Transaction, key string) error {
return nil
}
// CommitTxn validates and applies writes
// TODO: Implement conflict detection and commit
func (tm *TransactionManager) CommitTxn(txn *Transaction) error {
return nil
}
// RollbackTxn discards transaction
// TODO: Implement cleanup
func (tm *TransactionManager) RollbackTxn(txn *Transaction) error {
return nil
}
Test Template
func TestSnapshotIsolation(t *testing.T) {
tm := NewTransactionManager()
// Setup: initial value
txn0, _ := tm.BeginTxn(context.Background())
tm.Put(txn0, "key", []byte("initial"))
tm.CommitTxn(txn0)
// Transaction A: Read value
txnA, _ := tm.BeginTxn(context.Background())
v1, _ := tm.Get(txnA, "key")
assert.Equal(t, "initial", string(v1))
// Transaction B: Modify and commit
txnB, _ := tm.BeginTxn(context.Background())
tm.Put(txnB, "key", []byte("modified"))
assert.NoError(t, tm.CommitTxn(txnB))
// Transaction A: Still sees initial (snapshot isolation!)
v2, _ := tm.Get(txnA, "key")
assert.Equal(t, "initial", string(v2), "snapshot isolation violated")
// Commit A
assert.NoError(t, tm.CommitTxn(txnA))
}
func TestWriteWriteConflict(t *testing.T) {
tm := NewTransactionManager()
// Setup: initial value
txn0, _ := tm.BeginTxn(context.Background())
tm.Put(txn0, "key", []byte("initial"))
tm.CommitTxn(txn0)
txnA, _ := tm.BeginTxn(context.Background())
txnB, _ := tm.BeginTxn(context.Background())
// Both modify same key
tm.Put(txnA, "key", []byte("valueA"))
tm.Put(txnB, "key", []byte("valueB"))
// First commit succeeds
assert.NoError(t, tm.CommitTxn(txnA))
// Second fails (write-write conflict)
err := tm.CommitTxn(txnB)
assert.Error(t, err, "expected write-write conflict")
}
Acceptance Criteria
-
✅ Snapshot isolation verified
-
✅ Conflict detection preventing anomalies
-
✅ MVCC with version chains working
-
✅ Concurrent transactions coexist
-
✅ Write-write conflicts detected
-
✅ Read-write conflicts detected
-
✅ Deadlock prevention working
-
✅ Garbage collection cleaning old versions
-
✅ > 80% code coverage
-
✅ 10K+ transactions/sec throughput
-
✅ < 5% abort rate for typical workloads
Summary: Week 9 Complete
By completing Week 9, you’ve learned and implemented:
1. ACID Properties
-
Atomicity: All or nothing
-
Consistency: Valid state
-
Isolation: Concurrent access
-
Durability: Crash survival
2. Isolation Levels
-
Read Uncommitted → Serializable
-
Performance vs consistency trade-offs
-
Dirty reads, non-repeatable reads
-
Phantom reads prevention
3. MVCC Architecture
-
Multiple versions for concurrent access
-
Version chains with visibility
-
Snapshot isolation implementation
-
Garbage collection of old versions
4. Conflict Detection
-
Write-write conflicts
-
Write-read conflicts
-
Read-write conflicts
-
Automatic abort on conflict
Metrics Achieved:
-
✅ Snapshot isolation guarantees
-
✅ Concurrent transactions coexist
-
✅ Automatic conflict detection
-
✅ Deadlock prevention
-
✅ 10K+ transactions/sec throughput
-
✅ <5% abort rate for typical workloads
Ready for Week 10?
Next week we’ll implement replication and high availability with leader-follower architecture, automatic failover, and consistency models.
Continue to Week 10: Replication →