Transactions and ACID Properties

Learning Objectives

  • • Master ACID properties and their implementation
  • • Understand isolation levels and their trade-offs
  • • Implement Multi-Version Concurrency Control (MVCC)
  • • Build snapshot isolation for repeatable reads
  • • Detect and prevent transaction conflicts
  • • Implement deadlock detection and prevention

Lesson 9.1: Transaction Fundamentals

ACID Properties in Key-Value Stores

ACID guarantees are fundamental to databases. Let's understand each property and how to implement them in our key-value store.

Atomicity

All or nothing - transaction either fully completes or fully rolls back

Transfer $100 from Account A to Account B:
  Step 1: Deduct $100 from A
  Step 2: Add $100 to B

Both happen or neither happens.
If crash between steps: Rollback both.

Consistency

Data remains in valid state

Constraint: Balance >= 0

Before: A=$50, B=$50
Transaction: Transfer $100 (would violate constraint)
Result: Transaction rejected (consistency preserved)

Isolation

Concurrent transactions don't interfere

Transaction 1: Read Balance A
Transaction 2: Modify Balance A

Without isolation:
  T1 reads $100
  T2 modifies to $50
  T1 uses stale $100 (inconsistency!)

With isolation:
  T1 reads $100
  T2 waits or sees snapshot
  Consistent view

Durability

Committed data survives crashes

Transaction: Transfer $100, committed
Power failure 1 second later
System restarts
Data still shows transfer: Durable!

Isolation Levels

Different levels provide different guarantees vs performance trade-offs:

Read Uncommitted (Dirty Reads Allowed)

Transaction A: Read X=100
Transaction B: Modify X to 50 (not committed)
Transaction A: Sees X=50 (DIRTY READ!)

Transaction B: Rollback
Transaction A: Used stale data (consistency violation!)
Isolation Level
Lowest
Consistency
Lowest
Performance
Highest

Read Committed (No Dirty Reads)

Transaction A: Read X=100
Transaction B: Modify X to 50, COMMIT
Transaction A: Read X again = 50 (NON-REPEATABLE READ)

Same transaction sees different values!
Isolation Level
Medium
Consistency
Medium
Performance
Good

Repeatable Read (No Non-Repeatable Reads)

Transaction A: Read X=100
Transaction B: Modify X to 50, COMMIT
Transaction A: Read X again = 100 (REPEATABLE!)

But phantom records possible:
Transaction A: Range query (0-100)
Transaction B: Insert record at 75, COMMIT
Transaction A: Range query again = includes new record
Isolation Level
High
Consistency
High
Performance
Moderate

Serializable (Highest Isolation)

Transactions execute as if serially, one after another.

Transaction A: Read/Write X, Y
Transaction B: Read/Write X, Y

One fully completes before other starts (or reorder to avoid conflict).

SLOWEST but SAFEST.
Isolation Level
Highest
Consistency
Highest
Performance
Lowest

Multi-Version Concurrency Control (MVCC)

Problem: Locking reduces concurrency

Write lock held on row during transaction:
  T1: Lock row → Read → Process → Update (long operation)
  T2: Wants to read same row → BLOCKED for entire duration!

Solution: MVCC - multiple versions allow readers and writers to coexist

Instead of locking, maintain multiple versions:

Row "user:1" versions:
  v1 (tx_id=100): {"name": "alice", "age": 30}
  v2 (tx_id=105): {"name": "alice", "age": 31}
  v3 (tx_id=110): {"name": "alice", "age": 32}

Transaction A (start at v2):
  Reads: Always sees v2 (consistent snapshot)
  
Transaction B (start at v3):
  Modifies to v4: {"name": "alice", "age": 33}

Both proceed concurrently!

Version Chain

Every row has version chain:

Key: user:1
v1 → v2 → v3 → v4 → ...

Each version tagged with:
- Transaction ID (who created it)
- Commit timestamp
- Visible to which transactions

Readers: Follow chain to find visible version
Writers: Append new version

Snapshot Isolation

Snapshot Isolation: Each transaction operates on consistent snapshot

Global state at time T0:
  user:1 = alice
  user:2 = bob
  user:3 = charlie
  txn_id = 110

Transaction starts at T0:
  Snapshot: {user:1=alice, user:2=bob, user:3=charlie}
  All reads return from this snapshot
  Modifications create new version

At T1, global state changes (other transactions):
  user:1 = alice_new
  user:2 = bob_new
  txn_id = 115

Transaction still reads snapshot values:
  Repeatable reads guaranteed!
  But new transactions see new values

Lesson 9.2: Implementation - Core Transaction Engine

Transaction Structure

package transaction

import (
    "context"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// TransactionID uniquely identifies transaction
type TransactionID uint64

// VersionNumber tracks versions for MVCC
type VersionNumber uint64

// Transaction represents active transaction
type Transaction struct {
    ID              TransactionID
    StartTime       time.Time
    StartVersion    VersionNumber
    Status          TxnStatus
    ReadSet         map[string]VersionNumber  // key → version read
    WriteSet        map[string][]byte         // key → new value
    DeleteSet       map[string]bool           // keys marked for deletion
    mu              sync.RWMutex
}

type TxnStatus int

const (
    TxnPending TxnStatus = iota
    TxnCommitted
    TxnRolledback
    TxnAborted  // Due to conflict
)

func (t *Transaction) String() string {
    return fmt.Sprintf("Txn[id=%d, status=%d, startVer=%d]", t.ID, t.Status, t.StartVersion)
}

// VersionedValue stores multiple versions
type VersionedValue struct {
    Version   VersionNumber
    Value     []byte
    CreatedBy TransactionID
    CreatedAt time.Time
    IsDeleted bool
}

// VersionChain manages all versions of a key
type VersionChain struct {
    Key      string
    Versions []*VersionedValue
    mu       sync.RWMutex
}

// GetVisibleVersion returns version visible to transaction
func (vc *VersionChain) GetVisibleVersion(txn *Transaction) (*VersionedValue, error) {
    vc.mu.RLock()
    defer vc.mu.RUnlock()
    
    // Walk backwards through versions (newest first)
    for i := len(vc.Versions) - 1; i >= 0; i-- {
        v := vc.Versions[i]
        
        // Version must be created before transaction started
        if v.Version <= txn.StartVersion {
            if !v.IsDeleted {
                return v, nil
            }
            // Key was deleted
            return nil, fmt.Errorf("key deleted at version %d", v.Version)
        }
    }
    
    return nil, fmt.Errorf("key not found")
}

// AppendVersion adds new version
func (vc *VersionChain) AppendVersion(v *VersionedValue) {
    vc.mu.Lock()
    defer vc.mu.Unlock()
    vc.Versions = append(vc.Versions, v)
}

// CleanupOldVersions removes versions no longer needed
func (vc *VersionChain) CleanupOldVersions(oldestActiveVersion VersionNumber) int {
    vc.mu.Lock()
    defer vc.mu.Unlock()
    
    keepIdx := 0
    for i, v := range vc.Versions {
        if v.Version >= oldestActiveVersion {
            vc.Versions[keepIdx] = vc.Versions[i]
            keepIdx++
        }
    }
    
    removed := len(vc.Versions) - keepIdx
    vc.Versions = vc.Versions[:keepIdx]
    return removed
}

Transaction Manager

// TransactionManager coordinates transactions
type TransactionManager struct {
    nextTxnID           uint64
    nextVersionNumber   VersionNumber
    
    activeTxns          map[TransactionID]*Transaction
    versionChains       map[string]*VersionChain
    
    mu                  sync.RWMutex
    conflictDetector    *ConflictDetector
    deadlockDetector    *DeadlockDetector
    undoLog             *UndoLog
}

func NewTransactionManager() *TransactionManager {
    return &TransactionManager{
        activeTxns:       make(map[TransactionID]*Transaction),
        versionChains:    make(map[string]*VersionChain),
        conflictDetector: NewConflictDetector(),
        deadlockDetector: NewDeadlockDetector(),
        undoLog:          NewUndoLog(),
    }
}

// BeginTxn starts new transaction
func (tm *TransactionManager) BeginTxn(ctx context.Context) (*Transaction, error) {
    tm.mu.Lock()
    defer tm.mu.Unlock()
    
    txnID := TransactionID(atomic.AddUint64(&tm.nextTxnID, 1))
    
    txn := &Transaction{
        ID:           txnID,
        StartTime:    time.Now(),
        StartVersion: VersionNumber(atomic.LoadUint64((*uint64)(&tm.nextVersionNumber))),
        Status:       TxnPending,
        ReadSet:      make(map[string]VersionNumber),
        WriteSet:     make(map[string][]byte),
        DeleteSet:    make(map[string]bool),
    }
    
    tm.activeTxns[txnID] = txn
    return txn, nil
}

// Get retrieves value within transaction
func (tm *TransactionManager) Get(txn *Transaction, key string) ([]byte, error) {
    if txn.Status != TxnPending {
        return nil, fmt.Errorf("transaction not pending")
    }
    
    txn.mu.Lock()
    defer txn.mu.Unlock()
    
    // Check write buffer first (read-your-write)
    if val, ok := txn.WriteSet[key]; ok {
        return val, nil
    }
    
    // Check delete buffer
    if txn.DeleteSet[key] {
        return nil, fmt.Errorf("key marked for deletion")
    }
    
    // Get version chain
    tm.mu.RLock()
    chain, exists := tm.versionChains[key]
    tm.mu.RUnlock()
    
    if !exists {
        return nil, fmt.Errorf("key not found")
    }
    
    // Get visible version
    version, err := chain.GetVisibleVersion(txn)
    if err != nil {
        return nil, err
    }
    
    // Record read
    txn.ReadSet[key] = version.Version
    
    return version.Value, nil
}

// Put writes value (buffered)
func (tm *TransactionManager) Put(txn *Transaction, key string, value []byte) error {
    if txn.Status != TxnPending {
        return fmt.Errorf("transaction not pending")
    }
    
    txn.mu.Lock()
    defer txn.mu.Unlock()
    
    // Remove from delete set if present
    delete(txn.DeleteSet, key)
    
    // Add to write set
    txn.WriteSet[key] = value
    
    // Log to undo log for potential rollback
    tm.undoLog.LogWrite(txn.ID, key, value)
    
    return nil
}

// Delete marks key for deletion
func (tm *TransactionManager) Delete(txn *Transaction, key string) error {
    if txn.Status != TxnPending {
        return fmt.Errorf("transaction not pending")
    }
    
    txn.mu.Lock()
    defer txn.mu.Unlock()
    
    // Remove from write set if present
    delete(txn.WriteSet, key)
    
    // Add to delete set
    txn.DeleteSet[key] = true
    
    return nil
}

// CommitTxn validates and applies writes
func (tm *TransactionManager) CommitTxn(txn *Transaction) error {
    if txn.Status != TxnPending {
        return fmt.Errorf("transaction not pending: %v", txn.Status)
    }
    
    tm.mu.Lock()
    defer tm.mu.Unlock()
    
    // Check for conflicts
    if err := tm.conflictDetector.CheckConflict(txn); err != nil {
        txn.Status = TxnAborted
        delete(tm.activeTxns, txn.ID)
        return fmt.Errorf("conflict detected: %w", err)
    }
    
    // Check for deadlocks
    if err := tm.deadlockDetector.CheckDeadlock(txn); err != nil {
        txn.Status = TxnAborted
        delete(tm.activeTxns, txn.ID)
        return fmt.Errorf("deadlock detected: %w", err)
    }
    
    // Get new version number for all writes
    newVersion := VersionNumber(atomic.AddUint64((*uint64)(&tm.nextVersionNumber), 1))
    
    // Apply writes
    for key, value := range txn.WriteSet {
        tm.getOrCreateChain(key).AppendVersion(&VersionedValue{
            Version:   newVersion,
            Value:     value,
            CreatedBy: txn.ID,
            CreatedAt: time.Now(),
            IsDeleted: false,
        })
    }
    
    // Apply deletes
    for key := range txn.DeleteSet {
        tm.getOrCreateChain(key).AppendVersion(&VersionedValue{
            Version:   newVersion,
            CreatedBy: txn.ID,
            CreatedAt: time.Now(),
            IsDeleted: true,
        })
    }
    
    txn.Status = TxnCommitted
    delete(tm.activeTxns, txn.ID)
    
    return nil
}

// RollbackTxn discards transaction
func (tm *TransactionManager) RollbackTxn(txn *Transaction) error {
    tm.mu.Lock()
    defer tm.mu.Unlock()
    
    txn.mu.Lock()
    defer txn.mu.Unlock()
    
    // Clear write/delete buffers
    txn.WriteSet = make(map[string][]byte)
    txn.DeleteSet = make(map[string]bool)
    txn.ReadSet = make(map[string]VersionNumber)
    
    txn.Status = TxnRolledback
    delete(tm.activeTxns, txn.ID)
    
    return nil
}

func (tm *TransactionManager) getOrCreateChain(key string) *VersionChain {
    if chain, ok := tm.versionChains[key]; ok {
        return chain
    }
    
    chain := &VersionChain{Key: key, Versions: make([]*VersionedValue, 0)}
    tm.versionChains[key] = chain
    return chain
}

Conflict Detection

// ConflictDetector checks for transaction conflicts
type ConflictDetector struct {
    readSet  map[TransactionID]map[string]bool  // txn → set of keys read
    writeSet map[TransactionID]map[string]bool  // txn → set of keys written
    mu       sync.RWMutex
}

func NewConflictDetector() *ConflictDetector {
    return &ConflictDetector{
        readSet:  make(map[TransactionID]map[string]bool),
        writeSet: make(map[TransactionID]map[string]bool),
    }
}

// CheckConflict detects conflicts
func (cd *ConflictDetector) CheckConflict(txn *Transaction) error {
    cd.mu.RLock()
    defer cd.mu.RUnlock()
    
    txn.mu.RLock()
    defer txn.mu.RUnlock()
    
    // Check write-write conflicts
    for key := range txn.WriteSet {
        for otherID, otherWrites := range cd.writeSet {
            if otherID != txn.ID && otherWrites[key] {
                return fmt.Errorf("write-write conflict on key %s with txn %d", key, otherID)
            }
        }
    }
    
    // Check write-read conflicts
    for key := range txn.WriteSet {
        for otherID, otherReads := range cd.readSet {
            if otherID != txn.ID && otherReads[key] {
                return fmt.Errorf("write-read conflict on key %s with txn %d", key, otherID)
            }
        }
    }
    
    // Check read-write conflicts
    for key := range txn.ReadSet {
        for otherID, otherWrites := range cd.writeSet {
            if otherID != txn.ID && otherWrites[key] {
                return fmt.Errorf("read-write conflict on key %s with txn %d", key, otherID)
            }
        }
    }
    
    return nil
}

Deadlock Detection

// DeadlockDetector prevents circular wait
type DeadlockDetector struct {
    waitGraph map[TransactionID]map[TransactionID]bool  // txn A → txns it waits for
    mu        sync.RWMutex
}

func NewDeadlockDetector() *DeadlockDetector {
    return &DeadlockDetector{
        waitGraph: make(map[TransactionID]map[TransactionID]bool),
    }
}

// RecordWait records that txnA waits for txnB
func (dd *DeadlockDetector) RecordWait(txnA, txnB TransactionID) error {
    dd.mu.Lock()
    defer dd.mu.Unlock()
    
    if dd.waitGraph[txnA] == nil {
        dd.waitGraph[txnA] = make(map[TransactionID]bool)
    }
    
    dd.waitGraph[txnA][txnB] = true
    
    // Check for cycle
    if dd.hasCycle(txnA) {
        return fmt.Errorf("deadlock detected: cycle involving txn %d", txnA)
    }
    
    return nil
}

// hasCycle detects cycle using DFS
func (dd *DeadlockDetector) hasCycle(start TransactionID) bool {
    visited := make(map[TransactionID]bool)
    recStack := make(map[TransactionID]bool)
    
    return dd.dfs(start, visited, recStack)
}

func (dd *DeadlockDetector) dfs(node TransactionID, visited, recStack map[TransactionID]bool) bool {
    visited[node] = true
    recStack[node] = true
    
    for neighbor := range dd.waitGraph[node] {
        if !visited[neighbor] {
            if dd.dfs(neighbor, visited, recStack) {
                return true
            }
        } else if recStack[neighbor] {
            return true  // Back edge = cycle
        }
    }
    
    recStack[node] = false
    return false
}

Lesson 9.3: Usage Examples and Complete Workflows

Example 1: Simple Transaction

// Transfer money between accounts
func TransferMoney(tm *TransactionManager, fromKey, toKey string, amount int) error {
    // Begin transaction
    txn, err := tm.BeginTxn(context.Background())
    if err != nil {
        return err
    }
    
    // Read balances
    fromBal, _ := tm.Get(txn, fromKey)
    toBal, _ := tm.Get(txn, toKey)
    
    fromVal := parseInt(fromBal)
    toVal := parseInt(toBal)
    
    // Validate
    if fromVal < amount {
        tm.RollbackTxn(txn)
        return fmt.Errorf("insufficient funds")
    }
    
    // Update balances
    tm.Put(txn, fromKey, formatInt(fromVal-amount))
    tm.Put(txn, toKey, formatInt(toVal+amount))
    
    // Commit
    if err := tm.CommitTxn(txn); err != nil {
        tm.RollbackTxn(txn)
        return err
    }
    
    return nil
}

Example 2: Read-Modify-Write with Retry

func IncrementCounter(tm *TransactionManager, key string, maxRetries int) error {
    for attempt := 0; attempt < maxRetries; attempt++ {
        txn, _ := tm.BeginTxn(context.Background())
        
        // Read current value
        val, _ := tm.Get(txn, key)
        current := parseInt(val)
        
        // Increment
        tm.Put(txn, key, formatInt(current+1))
        
        // Try to commit
        err := tm.CommitTxn(txn)
        if err == nil {
            return nil
        }
        
        // If conflict, retry
        if strings.Contains(err.Error(), "conflict") {
            continue
        }
        
        return err
    }
    
    return fmt.Errorf("max retries exceeded")
}

Example 3: Multi-Step Transaction

func ProcessOrder(tm *TransactionManager, orderedKeys []string) error {
    txn, _ := tm.BeginTxn(context.Background())
    defer func() {
        if txn.Status == TxnPending {
            tm.RollbackTxn(txn)
        }
    }()
    
    // Step 1: Reserve inventory
    for _, key := range orderedKeys {
        qty, _ := tm.Get(txn, key)
        current := parseInt(qty)
        if current < 1 {
            return fmt.Errorf("out of stock: %s", key)
        }
        tm.Put(txn, key, formatInt(current-1))
    }
    
    // Step 2: Create order record
    orderID := generateID()
    tm.Put(txn, "order:"+orderID, []byte("pending"))
    
    // Step 3: Commit all changes atomically
    return tm.CommitTxn(txn)
}

Lab 9.1: Implement Complete MVCC Transaction Engine

Objective

Build production-ready transaction system with MVCC, conflict detection, and deadlock prevention.

Requirements

  • • Transaction Support: Begin/commit/rollback operations, snapshot isolation
  • • MVCC Implementation: Multiple versions per key, version chains with visibility
  • • Conflict Detection: Write-write, write-read, read-write conflicts
  • • Deadlock Prevention: Wait-for graph, cycle detection
  • • Testing: Basic read/write transactions, snapshot isolation verification
  • • Benchmarks: Transaction throughput, conflict rate impact, memory overhead

Starter Code

package transaction

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// Transaction represents database transaction
type Transaction struct {
    ID           TransactionID
    StartTime    time.Time
    StartVersion VersionNumber
    Status       TxnStatus
    // TODO: Add read set, write set, delete set
}

// TransactionManager coordinates transactions
type TransactionManager struct {
    // TODO: Add fields for transaction tracking, version management
}

// BeginTxn starts new transaction
// TODO: Implement with snapshot capture
func (tm *TransactionManager) BeginTxn(ctx context.Context) (*Transaction, error) {
    return nil, nil
}

// Get reads value within transaction
// TODO: Implement with snapshot isolation
func (tm *TransactionManager) Get(txn *Transaction, key string) ([]byte, error) {
    return nil, nil
}

// Put buffers write in transaction
// TODO: Implement write buffering
func (tm *TransactionManager) Put(txn *Transaction, key string, value []byte) error {
    return nil
}

// Delete buffers delete in transaction
// TODO: Implement delete buffering
func (tm *TransactionManager) Delete(txn *Transaction, key string) error {
    return nil
}

// CommitTxn validates and applies writes
// TODO: Implement conflict detection and commit
func (tm *TransactionManager) CommitTxn(txn *Transaction) error {
    return nil
}

// RollbackTxn discards transaction
// TODO: Implement cleanup
func (tm *TransactionManager) RollbackTxn(txn *Transaction) error {
    return nil
}

Test Template

func TestSnapshotIsolation(t *testing.T) {
    tm := NewTransactionManager()
    
    // Setup: initial value
    txn0, _ := tm.BeginTxn(context.Background())
    tm.Put(txn0, "key", []byte("initial"))
    tm.CommitTxn(txn0)
    
    // Transaction A: Read value
    txnA, _ := tm.BeginTxn(context.Background())
    v1, _ := tm.Get(txnA, "key")
    assert.Equal(t, "initial", string(v1))
    
    // Transaction B: Modify and commit
    txnB, _ := tm.BeginTxn(context.Background())
    tm.Put(txnB, "key", []byte("modified"))
    assert.NoError(t, tm.CommitTxn(txnB))
    
    // Transaction A: Still sees initial (snapshot isolation!)
    v2, _ := tm.Get(txnA, "key")
    assert.Equal(t, "initial", string(v2), "snapshot isolation violated")
    
    // Commit A
    assert.NoError(t, tm.CommitTxn(txnA))
}

func TestWriteWriteConflict(t *testing.T) {
    tm := NewTransactionManager()
    
    // Setup: initial value
    txn0, _ := tm.BeginTxn(context.Background())
    tm.Put(txn0, "key", []byte("initial"))
    tm.CommitTxn(txn0)
    
    txnA, _ := tm.BeginTxn(context.Background())
    txnB, _ := tm.BeginTxn(context.Background())
    
    // Both modify same key
    tm.Put(txnA, "key", []byte("valueA"))
    tm.Put(txnB, "key", []byte("valueB"))
    
    // First commit succeeds
    assert.NoError(t, tm.CommitTxn(txnA))
    
    // Second fails (write-write conflict)
    err := tm.CommitTxn(txnB)
    assert.Error(t, err, "expected write-write conflict")
}

Acceptance Criteria

  • ✅ Snapshot isolation verified
  • ✅ Conflict detection preventing anomalies
  • ✅ MVCC with version chains working
  • ✅ Concurrent transactions coexist
  • ✅ Write-write conflicts detected
  • ✅ Read-write conflicts detected
  • ✅ Deadlock prevention working
  • ✅ Garbage collection cleaning old versions
  • ✅ > 80% code coverage
  • ✅ 10K+ transactions/sec throughput
  • ✅ < 5% abort rate for typical workloads

Summary: Week 9 Complete

By completing Week 9, you've learned and implemented:

1. ACID Properties

  • • Atomicity: All or nothing
  • • Consistency: Valid state
  • • Isolation: Concurrent access
  • • Durability: Crash survival

2. Isolation Levels

  • • Read Uncommitted → Serializable
  • • Performance vs consistency trade-offs
  • • Dirty reads, non-repeatable reads
  • • Phantom reads prevention

3. MVCC Architecture

  • • Multiple versions for concurrent access
  • • Version chains with visibility
  • • Snapshot isolation implementation
  • • Garbage collection of old versions

4. Conflict Detection

  • • Write-write conflicts
  • • Write-read conflicts
  • • Read-write conflicts
  • • Automatic abort on conflict

Metrics Achieved:

  • ✅ Snapshot isolation guarantees
  • ✅ Concurrent transactions coexist
  • ✅ Automatic conflict detection
  • ✅ Deadlock prevention
  • ✅ 10K+ transactions/sec throughput
  • ✅ <5% abort rate for typical workloads

Ready for Week 10?

Next week we'll implement replication and high availability with leader-follower architecture, automatic failover, and consistency models.

Continue to Week 10: Replication →