Replication and High Availability

Learning Objectives

  • • Master leader-follower replication architecture
  • • Implement automatic failover and leader election
  • • Understand consistency models and their trade-offs
  • • Build replication lag monitoring and health checks
  • • Design high-availability distributed systems
  • • Handle split-brain scenarios and data consistency

Lesson 10.1: Replication Strategies

Leader-Follower Replication

Architecture: One leader node, multiple follower nodes

Client Write
LEADER
Replication Log
FOLLOWER1
(Read-only)
FOLLOWER2
(Read-only)
FOLLOWER3
(Read-only)

Write Flow

  1. 1. Client sends write to leader
  2. 2. Leader applies write locally
  3. 3. Leader sends write to all followers
  4. 4. Followers apply write (eventually consistent)
  5. 5. Leader acknowledges success to client

Timeline

t0: Write arrives at leader
t1: Leader applies locally (durable to WAL)
t2: Replication sent to followers
t3-5: Followers apply (replication lag)
t6: Acknowledgment sent to client

Advantages

  • • Simple to implement
  • • Strong consistency for reads from leader
  • • Easy failover (promote follower to leader)
  • • One write path

Disadvantages

  • • Leader is single point of failure
  • • Write throughput limited by leader capacity
  • • Replication lag (followers behind)
  • • Follower failover not automatic

Implementation

// ReplicationState tracks replication progress
type ReplicationState struct {
    nodeID          string
    role            NodeRole
    term            uint64
    lastLogIndex    uint64
    lastAppliedIndex uint64
    commitIndex     uint64
    
    // Followers track replication progress
    nextIndex       map[string]uint64   // leader → next index per follower
    matchIndex      map[string]uint64   // leader → confirmed index per follower
    
    // Leader election
    votedFor        string
    votesReceived   map[string]bool
    
    mu              sync.RWMutex
}

type NodeRole int

const (
    RoleFollower NodeRole = iota
    RoleCandidate
    RoleLeader
)

// Leader replicates writes to followers
type Leader struct {
    nodeID          string
    followers       map[string]*FollowerState
    replicationLog  []LogEntry
    commitIndex     uint64
    mu              sync.RWMutex
}

type FollowerState struct {
    nodeID      string
    conn        net.Conn
    nextIndex   uint64
    matchIndex  uint64
    sendCh      chan *LogEntry
}

type LogEntry struct {
    Index   uint64
    Term    uint64
    Op      string
    Key     []byte
    Value   []byte
    TxnID   uint64
}

// ApplyWrite applies write and replicates
func (l *Leader) ApplyWrite(entry *LogEntry) error {
    l.mu.Lock()
    defer l.mu.Unlock()
    
    // Append to leader's log
    entry.Index = uint64(len(l.replicationLog))
    entry.Term = l.getCurrentTerm()
    l.replicationLog = append(l.replicationLog, *entry)
    
    // Replicate to followers
    for _, follower := range l.followers {
        select {
        case follower.sendCh <- entry:
        case <-time.After(5 * time.Second):
            log.Printf("timeout replicating to %s", follower.nodeID)
        }
    }
    
    return nil
}

// UpdateMatchIndex updates follower confirmation
func (l *Leader) UpdateMatchIndex(followerID string, matchIndex uint64) {
    l.mu.Lock()
    defer l.mu.Unlock()
    
    if follower, ok := l.followers[followerID]; ok {
        follower.matchIndex = matchIndex
        l.updateCommitIndex()
    }
}

// updateCommitIndex advances commit when majority replicated
func (l *Leader) updateCommitIndex() {
    // Find highest index replicated to majority
    indices := make([]uint64, 0, len(l.followers))
    for _, f := range l.followers {
        indices = append(indices, f.matchIndex)
    }
    sort.Slice(indices, func(i, j int) bool { return indices[i] > indices[j] })
    
    // Majority = (N+1)/2
    majorityIndex := indices[(len(indices))/2]
    
    if majorityIndex > l.commitIndex {
        l.commitIndex = majorityIndex
    }
}

Asynchronous vs Synchronous Replication

Asynchronous: Leader doesn't wait for follower acknowledgment

Timeline:
t0: Leader writes locally
t1: Acknowledges to client ("success!")
t2: Sends to follower (async)
t3: Follower applies

Risk: If leader crashes between t1-t3:
     Client thinks write persisted
     But followers don't have it
     Data loss on failover!
Consistency
Eventually consistent
Latency
Very low
Safety
Lower

Synchronous: Leader waits for follower acknowledgment

Timeline:
t0: Leader writes locally
t1: Sends to follower
t2: Follower acknowledges
t3: Leader acknowledges to client

Guarantee: Write is replicated before returning

Risk: If any follower slow/down:
     All writes block (latency = slowest follower)
Consistency
Strong consistent
Latency
High
Safety
Very high

Semi-Synchronous: Wait for N followers (best balance)

Replication to at least 1 follower (out of 3):

Timeline:
t0: Leader writes
t1: Follower-1 acks
t2: Leader acks to client
    (don't wait for follower-2, 3)

Consistency: Eventual (within RTT)
Latency: Good
Safety: Good

Configuration:
- min_replicas_to_ack = 1 (wait for 1 follower)
- timeout = 1 second (fallback to async after timeout)

Implementation

type ReplicationMode int

const (
    ModeAsync ReplicationMode = iota
    ModeSync
    ModeSemiSync
)

// ApplyWriteWithMode applies write based on replication mode
func (l *Leader) ApplyWriteWithMode(entry *LogEntry, mode ReplicationMode, minReplicas int) error {
    l.mu.Lock()
    
    // Add to log
    entry.Index = uint64(len(l.replicationLog))
    entry.Term = l.getCurrentTerm()
    l.replicationLog = append(l.replicationLog, *entry)
    
    l.mu.Unlock()
    
    switch mode {
    case ModeAsync:
        // Send to followers async, return immediately
        for _, follower := range l.followers {
            go func(f *FollowerState) {
                select {
                case f.sendCh <- entry:
                case <-time.After(5 * time.Second):
                }
            }(follower)
        }
        return nil
        
    case ModeSync:
        // Wait for all followers
        acksCh := make(chan string, len(l.followers))
        for _, follower := range l.followers {
            go func(f *FollowerState) {
                f.sendCh <- entry
                // Wait for ack
                acksCh <- f.nodeID
            }(follower)
        }
        
        for i := 0; i < len(l.followers); i++ {
            select {
            case <-acksCh:
            case <-time.After(5 * time.Second):
                return fmt.Errorf("replication timeout")
            }
        }
        return nil
        
    case ModeSemiSync:
        // Wait for N followers
        acksCh := make(chan string, len(l.followers))
        ackCount := 0
        
        for _, follower := range l.followers {
            go func(f *FollowerState) {
                f.sendCh <- entry
                acksCh <- f.nodeID
            }(follower)
        }
        
        deadline := time.Now().Add(1 * time.Second)
        for ackCount < minReplicas {
            select {
            case <-acksCh:
                ackCount++
            case <-time.After(time.Until(deadline)):
                if ackCount == 0 {
                    return fmt.Errorf("replication failed: no acks received")
                }
                return nil
            }
        }
        return nil
    }
    
    return fmt.Errorf("unknown replication mode")
}

Replication Lag Monitoring

// ReplicationLagMonitor tracks lag
type ReplicationLagMonitor struct {
    followerLag map[string]time.Duration
    mu          sync.RWMutex
}

// UpdateLag records lag for follower
func (rlm *ReplicationLagMonitor) UpdateLag(followerID string, lag time.Duration) {
    rlm.mu.Lock()
    defer rlm.mu.Unlock()
    rlm.followerLag[followerID] = lag
}

// GetLag returns lag for follower
func (rlm *ReplicationLagMonitor) GetLag(followerID string) time.Duration {
    rlm.mu.RLock()
    defer rlm.mu.RUnlock()
    return rlm.followerLag[followerID]
}

// GetMaxLag returns maximum lag
func (rlm *ReplicationLagMonitor) GetMaxLag() time.Duration {
    rlm.mu.RLock()
    defer rlm.mu.RUnlock()
    
    var maxLag time.Duration
    for _, lag := range rlm.followerLag {
        if lag > maxLag {
            maxLag = lag
        }
    }
    return maxLag
}

// ReplicationHealthCheck checks replication health
func (l *Leader) ReplicationHealthCheck(maxAcceptableLag time.Duration) error {
    l.mu.RLock()
    defer l.mu.RUnlock()
    
    for followerID, follower := range l.followers {
        lag := time.Since(time.Unix(0, int64(follower.matchIndex)))
        
        if lag > maxAcceptableLag {
            return fmt.Errorf("replication lag too high for %s: %v", followerID, lag)
        }
    }
    
    return nil
}

Lesson 10.2: Failover and Automatic Leader Election

Detecting Leader Failure

// FailoverManager detects leader failure and triggers election
type FailoverManager struct {
    cluster         []*ClusterNode
    leaderDetectionTimeout time.Duration
    heartbeatInterval      time.Duration
}

type ClusterNode struct {
    nodeID      string
    addr        string
    isLeader    bool
    lastHeartbeat time.Time
}

// DetectLeaderFailure checks if leader is healthy
func (fm *FailoverManager) DetectLeaderFailure() error {
    leader := fm.FindLeader()
    if leader == nil {
        return fmt.Errorf("no leader found")
    }
    
    // Check if leader is alive
    timeSinceHeartbeat := time.Since(leader.lastHeartbeat)
    
    if timeSinceHeartbeat > fm.leaderDetectionTimeout {
        log.Printf("Leader %s failed (no heartbeat for %v)", leader.nodeID, timeSinceHeartbeat)
        return fmt.Errorf("leader failed")
    }
    
    return nil
}

// TriggerElection starts new leader election
func (fm *FailoverManager) TriggerElection() error {
    log.Println("Triggering leader election...")
    
    // All nodes become candidates and vote
    var wg sync.WaitGroup
    for _, node := range fm.cluster {
        wg.Add(1)
        go func(n *ClusterNode) {
            defer wg.Done()
            n.StartElection()
        }(node)
    }
    
    wg.Wait()
    
    // Wait for leader election
    deadline := time.Now().Add(fm.leaderDetectionTimeout)
    for time.Now().Before(deadline) {
        if leader := fm.FindLeader(); leader != nil {
            log.Printf("New leader elected: %s", leader.nodeID)
            return nil
        }
        time.Sleep(50 * time.Millisecond)
    }
    
    return fmt.Errorf("election timeout: no leader elected")
}

// FindLeader returns current leader node
func (fm *FailoverManager) FindLeader() *ClusterNode {
    for _, node := range fm.cluster {
        if node.isLeader {
            return node
        }
    }
    return nil
}

// PromoteFollower promotes follower to leader
func (fm *FailoverManager) PromoteFollower(nodeID string) error {
    for _, node := range fm.cluster {
        if node.nodeID == nodeID {
            node.isLeader = true
            node.lastHeartbeat = time.Now()
            return nil
        }
    }
    return fmt.Errorf("node not found: %s", nodeID)
}

Consistency Models

Strong Consistency: All clients see latest writes

Write at t0: X=100
Client A reads at t1: X=100 (always latest)
Client B reads at t1: X=100 (always latest)

Guarantee: ALL reads return latest
Trade-off: Higher latency (must check leader)
Availability: Lower (read from leader only)
Usage: Financial systems, Critical data, Strict consistency requirements

Eventual Consistency: Clients may see old data briefly

Write at t0: X=100 (to leader)
Replication delay: 100ms

Client A at t0+50ms: Reads from follower: X=? (might see old)
Client B at t0+150ms: Reads from follower: X=100 (eventually consistent)

Guarantee: Eventually all see latest
Trade-off: Lower latency, stale reads possible
Availability: Higher (can read from followers)
Usage: Social media feeds, Caches, Analytics

Causal Consistency: Causally related writes ordered

Transaction A: X=1, then Y=1 (causal relationship)
Transaction B: Reads Y=1, so must see X=1

If B reads Y, it's because A wrote Y (causally before)
So B should see A's other writes too

Monotonic Consistency: Each client sees writes in order

Client A: Write X=1
Client B: Reads X=1
Client B: Reads X (later) sees X=1 or newer

Never backward: Can't see write, then older value, then newer

Implementation

type ConsistencyLevel int

const (
    ConsistencyStrong ConsistencyLevel = iota
    ConsistencyEventual
    ConsistencyMonotonic
    ConsistencyCausal
)

// ReadWithConsistency handles consistency models
func (c *Client) ReadWithConsistency(key string, level ConsistencyLevel) (string, error) {
    switch level {
    case ConsistencyStrong:
        // Read from leader (always latest)
        leader := c.cluster.FindLeader()
        if leader == nil {
            return "", fmt.Errorf("no leader available")
        }
        return c.connPool.GetConnection(leader.addr).Get(key)
        
    case ConsistencyEventual:
        // Read from any node (might be stale)
        conn := c.connPool.GetAny()
        return conn.Get(key)
        
    case ConsistencyMonotonic:
        // Read from node >= last read version
        if c.lastReadNode == nil {
            c.lastReadNode = c.cluster.FindLeader()
        }
        val, ver, _ := c.lastReadNode.GetWithVersion(key)
        
        if ver < c.lastReadVersion {
            // Find node with newer version
            for {
                node := c.cluster.RandomNode()
                _, ver, _ := node.GetWithVersion(key)
                if ver >= c.lastReadVersion {
                    c.lastReadNode = node
                    break
                }
            }
        }
        
        c.lastReadVersion = ver
        return val, nil
        
    case ConsistencyCausal:
        // Ensure causally consistent
        // (Implementation depends on vector clocks)
        return c.ReadWithVectorClocks(key)
    }
    
    return "", fmt.Errorf("unknown consistency level")
}

Lab 10.1: Implement Complete Replication System

Objective

Build multi-node replication with automatic failover and consistency guarantees.

Requirements

  • • Leader-Follower Replication: Leader accepts writes, replicates to followers, followers apply in order
  • • Failover: Detect leader failure, elect new leader from followers, redirect writes, prevent split-brain
  • • Consistency: Strong consistency option (read from leader), eventual consistency option (read from any)
  • • Replication Monitoring: Track replication lag, monitor health, alert on lag threshold
  • • Testing: Multi-node replication, failover scenarios, write consistency, failover time < 1 second
  • • Benchmarks: Write throughput, replication latency, failover time, multi-node read scaling

Starter Code

package replication

import (
    "context"
    "net"
    "sync"
    "time"
)

// ReplicationManager coordinates replication across cluster
type ReplicationManager struct {
    nodeID      string
    role        NodeRole
    nodes       map[string]*ClusterNode
    replicationMode ReplicationMode
    minReplicas int
    mu          sync.RWMutex
}

type ClusterNode struct {
    nodeID      string
    addr        string
    conn        net.Conn
    role        NodeRole
    lastHeartbeat time.Time
}

type NodeRole int

const (
    RoleFollower NodeRole = iota
    RoleLeader
)

type ReplicationMode int

const (
    ModeAsync ReplicationMode = iota
    ModeSync
    ModeSemiSync
)

// NewReplicationManager creates manager
// TODO: Implement initialization
func NewReplicationManager(nodeID string, nodes []string) *ReplicationManager {
    return nil
}

// BecomeLeader transitions to leader
// TODO: Implement leader initialization
func (rm *ReplicationManager) BecomeLeader() error {
    return nil
}

// BecomeFollower transitions to follower
// TODO: Implement follower initialization
func (rm *ReplicationManager) BecomeFollower(leaderAddr string) error {
    return nil
}

// ReplicateWrite sends write to followers
// TODO: Implement replication
func (rm *ReplicationManager) ReplicateWrite(key, value []byte) error {
    return nil
}

// HealthCheck detects failed nodes
// TODO: Implement health checking and failover
func (rm *ReplicationManager) HealthCheck() error {
    return nil
}

// GetRole returns current node role
func (rm *ReplicationManager) GetRole() NodeRole {
    rm.mu.RLock()
    defer rm.mu.RUnlock()
    return rm.role
}

// GetLeader returns current leader
// TODO: Implement leader lookup
func (rm *ReplicationManager) GetLeader() *ClusterNode {
    return nil
}

Test Template

func TestLeaderFollowerReplication(t *testing.T) {
    cluster := NewTestCluster(3)
    defer cluster.Stop()
    
    leader := cluster.GetLeader()
    assert.NotNil(t, leader)
    
    // Write to leader
    err := leader.Put([]byte("key"), []byte("value"))
    assert.NoError(t, err)
    
    // Wait for replication
    time.Sleep(100 * time.Millisecond)
    
    // Verify on followers
    for _, follower := range cluster.GetFollowers() {
        val, _ := follower.Get([]byte("key"))
        assert.Equal(t, "value", string(val))
    }
}

func TestFailover(t *testing.T) {
    cluster := NewTestCluster(3)
    defer cluster.Stop()
    
    leader := cluster.GetLeader()
    leader.Put([]byte("key"), []byte("value"))
    
    // Kill leader
    cluster.KillNode(leader.ID())
    
    // New leader elected
    time.Sleep(1500 * time.Millisecond)
    
    newLeader := cluster.GetLeader()
    assert.NotNil(t, newLeader)
    assert.NotEqual(t, leader.ID(), newLeader.ID())
    
    // Verify data persisted
    val, _ := newLeader.Get([]byte("key"))
    assert.Equal(t, "value", string(val))
}

func TestSemiSyncReplication(t *testing.T) {
    rm := NewReplicationManager("node1", []string{"node2", "node3"})
    rm.SetReplicationMode(ModeSemiSync, 1)
    
    start := time.Now()
    err := rm.ReplicateWrite([]byte("key"), []byte("value"))
    elapsed := time.Since(start)
    
    assert.NoError(t, err)
    assert.Less(t, elapsed.Milliseconds(), int64(100), "replication too slow")
}

func BenchmarkReplicationThroughput(b *testing.B) {
    cluster := NewTestCluster(3)
    defer cluster.Stop()
    
    leader := cluster.GetLeader()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        leader.Put([]byte("key"), []byte("value"))
    }
}

Acceptance Criteria

  • ✅ Writes replicate to all followers
  • ✅ Failover detects leader failure within 1 second
  • ✅ New leader elected automatically
  • ✅ No data loss after failover
  • ✅ Strong consistency option working (read from leader)
  • ✅ Eventual consistency option working (read from any)
  • ✅ Concurrent writes succeed
  • ✅ Replication lag monitored
  • ✅ > 85% code coverage
  • ✅ 1000+ writes/sec throughput
  • ✅ <500ms failover time

Summary: Week 10 Complete

By completing Week 10, you've learned and implemented:

1. Replication Strategies

  • • Leader-follower architecture
  • • Asynchronous vs synchronous replication
  • • Semi-synchronous for balance
  • • Replication lag monitoring

2. Automatic Failover

  • • Leader failure detection
  • • Automatic leader election
  • • Split-brain prevention
  • • Data consistency guarantees

3. Consistency Models

  • • Strong consistency (always latest)
  • • Eventual consistency (stale reads)
  • • Causal consistency (related writes)
  • • Monotonic consistency (no backward)

4. High Availability

  • • Multi-node replication
  • • Health monitoring
  • • Automatic recovery
  • • Performance optimization

Metrics Achieved:

  • ✅ <100ms replication lag
  • ✅ <500ms failover time
  • ✅ 1000+ writes/sec throughput
  • ✅ 3-9 node clusters tested
  • ✅ No data loss after failover

Module 5 Complete: Advanced Features

Congratulations! You now have advanced database features:

✅ Week 9: Transactions with MVCC

  • • ACID properties implementation
  • • Snapshot isolation
  • • Conflict detection
  • • Deadlock prevention

✅ Week 10: Replication & High Availability

  • • Leader-follower replication
  • • Automatic failover
  • • Consistency models
  • • Multi-node clusters

Ready for Module 6?

Next we'll focus on production readiness with monitoring, metrics, performance optimization, and deployment strategies.

Continue to Module 6: Production Readiness →