Concurrency and Thread Safety

Learning Objectives

  • • Understand race conditions and how to prevent them
  • • Master Go's synchronization primitives (Mutex, RWMutex, atomic)
  • • Implement sharded locking for high concurrency
  • • Learn lock-free programming techniques
  • • Build thread-safe operations with proper error handling

Lesson 3.1: Concurrency Challenges

Race Conditions in Key-Value Operations

A race condition occurs when multiple goroutines access shared data concurrently and at least one modifies it, without synchronization.

Example Race Condition

// BAD: Race condition example
type UnsafeMap struct {
    data map[string]int
}

func (m *UnsafeMap) Increment(key string) {
    val := m.data[key]  // Read
    val++
    m.data[key] = val   // Write
}

func main() {
    m := &UnsafeMap{data: make(map[string]int)}
    m.data["counter"] = 0
    
    // Two goroutines racing
    go m.Increment("counter")  // Goroutine A
    go m.Increment("counter")  // Goroutine B
    
    // Race: Both might read 0, increment to 1, write 1
    // Expected: 2, Got: 1
}

Timeline of Race Condition

Time Goroutine A Goroutine B Memory Value
t0 Read: val = 0 - counter = 0
t1 - Read: val = 0 counter = 0
t2 Write: val+1 - counter = 1
t3 - Write: val+1 counter = 1 (LOST WRITE!)

Detecting Race Conditions

Go has a built-in race detector that helps identify race conditions during development:

# Run with race detector
go test -race ./...

# Output:
# ==================
# WARNING: DATA RACE
# Write at 0x00c0001a2340 by goroutine 7:
#     main.(*UnsafeMap).Increment()
#         /path/to/file.go:12 +0x44
#
# Previous read by goroutine 6:
#     main.(*UnsafeMap).Increment()
#         /path/to/file.go:11 +0x38

How the Race Detector Works

  • • Instruments all memory accesses
  • • Tracks which goroutine accessed which memory
  • • Detects simultaneous reads+writes or writes+writes
  • • ~5-10x slowdown during testing

Important: Race detector catches most (but not all) race conditions!

Deadlocks and How to Prevent Them

A deadlock occurs when two or more goroutines block each other, waiting for events that will never occur.

Classic Deadlock: Circular Wait

// BAD: Deadlock example
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    
    go func() {
        ch1 <- 1      // Send on ch1
        <-ch2         // Wait for ch2 (blocks forever!)
    }()
    
    go func() {
        ch2 <- 2      // Send on ch2
        <-ch1         // Wait for ch1 (blocks forever!)
    }()
    
    time.Sleep(time.Second)
    fmt.Println("This never prints")
    // fatal error: all goroutines are asleep - deadlock!
}

Deadlock with Locks

// BAD: Lock ordering deadlock
type Account struct {
    mu    sync.Mutex
    value int
}

func Transfer(from, to *Account, amount int) {
    from.mu.Lock()
    defer from.mu.Unlock()
    
    to.mu.Lock()  // Could be locked by another transfer
    defer to.mu.Unlock()
    
    from.value -= amount
    to.value += amount
}

func main() {
    a := &Account{value: 100}
    b := &Account{value: 50}
    
    go Transfer(a, b, 10)  // Locks a, then b
    go Transfer(b, a, 20)  // Locks b, then a <- DEADLOCK!
}

Preventing Deadlocks

Strategy 1: Lock Ordering

Always acquire locks in the same order:

// GOOD: Consistent lock ordering
func Transfer(from, to *Account, amount int) {
    // Always lock in ID order to prevent circular wait
    if from.id < to.id {
        from.mu.Lock()
        defer from.mu.Unlock()
        to.mu.Lock()
        defer to.mu.Unlock()
    } else {
        to.mu.Lock()
        defer to.mu.Unlock()
        from.mu.Lock()
        defer from.mu.Unlock()
    }
    
    from.value -= amount
    to.value += amount
}

Strategy 2: Timeout

Don't wait forever:

// GOOD: Timeout prevents indefinite blocking
func Transfer(from, to *Account, amount int, timeout time.Duration) error {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    
    // Try to acquire locks with timeout...
    // Return error if timeout exceeded
}

Strategy 3: Single Lock

Use one lock for related resources:

// GOOD: Single lock for related objects
type Bank struct {
    mu       sync.Mutex
    accounts map[string]*Account
}

func (b *Bank) Transfer(fromID, toID string, amount int) {
    b.mu.Lock()
    defer b.mu.Unlock()
    
    from := b.accounts[fromID]
    to := b.accounts[toID]
    
    from.value -= amount
    to.value += amount
}

Lesson 3.2: Synchronization Primitives

Mutexes and RWMutex

A mutex (mutual exclusion lock) guarantees mutual exclusion: only one goroutine can hold the lock at a time.

Mutex Guarantees

type Mutex struct {
    // State tracks: locked, woken, starved
    state int32
    sema  uint32  // Semaphore for goroutine parking
}

Goroutine states:

  • • Runnable goroutine tries to acquire lock
  • • Lock is free → Acquire immediately
  • • Lock is held → Enter wait queue
  • • Park goroutine (sleep) until another goroutine calls Unlock()
  • • Wake first goroutine in queue and resume execution

Correct Usage Pattern

type Counter struct {
    mu    sync.Mutex
    value int
}

// Pattern 1: defer (recommended)
func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
    // defer ensures unlock even if panic
}

// Pattern 2: manual unlock (risky)
func (c *Counter) Increment() {
    c.mu.Lock()
    c.value++
    c.mu.Unlock()
    // Easy to forget unlock or miss on error
}

Common Mistakes

// ❌ BAD: Unlock without Lock
func (c *Counter) Bad1() {
    c.mu.Unlock()  // PANIC: sync: unlock of unlocked mutex
}

// ❌ BAD: Lock twice (deadlock)
func (c *Counter) Bad2() {
    c.mu.Lock()
    c.value++
    c.mu.Lock()    // Blocks forever (same goroutine)
}

// ❌ BAD: Copy mutex (separate locks!)
func (c *Counter) Bad3() {
    c2 := *c       // Copy of Counter
    c2.mu.Lock()   // Different lock than c.mu!
    c2.value++
}

RWMutex for Read-Heavy Workloads

RWMutex allows multiple readers simultaneously but only one exclusive writer.

RWMutex Usage

type Cache struct {
    mu    sync.RWMutex
    data  map[string]string
}

// Multiple goroutines can read simultaneously
func (c *Cache) Get(key string) string {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return c.data[key]
}

// Writer must wait for all readers
func (c *Cache) Set(key, value string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.data[key] = value
}

When NOT to use RWMutex

  • Write-heavy workloads: RWMutex overhead > benefit
  • Very short operations: Lock acquisition overhead
  • Simple operations: Regular Mutex is simpler

Atomic Operations

Atomic operations are indivisible operations on shared variables that don't require locks.

import "sync/atomic"

// Without atomic: Race condition
var counter int
counter++  // Read-modify-write not atomic

// With atomic: Safe
var counter int64
atomic.AddInt64(&counter, 1)  // Atomic operation

Atomic Operations for Common Patterns

// Counter
var ops int64
atomic.AddInt64(&ops, 1)
val := atomic.LoadInt64(&ops)

// Flag
var enabled int32  // 0 = false, 1 = true
atomic.StoreInt32(&enabled, 1)
if atomic.LoadInt32(&enabled) == 1 {
    // enabled is true
}

// Compare-and-swap (optimistic locking)
var value int32 = 10
swapped := atomic.CompareAndSwapInt32(&value, 10, 20)
// If value was 10, set to 20 and return true
// Otherwise return false

Performance: Atomic vs Mutex

func BenchmarkCounter(b *testing.B) {
    // Mutex version
    b.Run("Mutex", func(b *testing.B) {
        var mu sync.Mutex
        var counter int
        b.RunParallel(func(pb *testing.PB) {
            for pb.Next() {
                mu.Lock()
                counter++
                mu.Unlock()
            }
        })
    })
    
    // Atomic version
    b.Run("Atomic", func(b *testing.B) {
        var counter int64
        b.RunParallel(func(pb *testing.PB) {
            for pb.Next() {
                atomic.AddInt64(&counter, 1)
            }
        })
    })
}

// Results:
// BenchmarkCounter/Mutex-8    2,000,000 ops/sec
// BenchmarkCounter/Atomic-8  500,000,000 ops/sec (250x faster!)

When to use atomic:

  • • Simple counters (no complex operations)
  • • Flag variables
  • • Compare-and-swap optimizations
  • • Metrics collection

When NOT to use atomic:

  • • Complex operations requiring atomicity
  • • Multiple related variables
  • • State machines
  • • Need explicit order of operations

Lesson 3.3: Concurrent Operations

Fine-grained vs Coarse-grained Locking

Locking granularity refers to the scope of data protected by a lock.

Coarse-grained Locking (Simple, Slower)

type CoarseStore struct {
    mu   sync.RWMutex
    data map[string][]byte
}

func (s *CoarseStore) Get(key string) []byte {
    s.mu.RLock()        // Lock entire map
    defer s.mu.RUnlock()
    return s.data[key]
}
  • • Lock duration: Entire operation
  • • Contention: All goroutines contend for one lock
  • • Requests for different keys still block each other

Fine-grained Locking (Complex, Faster)

type FineStore struct {
    mu    sync.RWMutex      // Top-level lock
    locks map[string]*sync.RWMutex
    data  map[string][]byte
}

func (s *FineStore) Get(key string) []byte {
    s.mu.RLock()            // Brief lock on lock map
    lock := s.locks[key]
    s.mu.RUnlock()
    
    lock.RLock()            // Lock only this key
    defer lock.RUnlock()
    return s.data[key]
}
  • • Lock duration: Brief lock acquisition + individual key lock
  • • Contention: Only operations on same key contend
  • • Different keys proceed in parallel

Sharding for Parallelism

Sharding divides data into independent partitions, each with own locks.

type ShardedStore struct {
    shards []*storeShard
}

type storeShard struct {
    mu   sync.RWMutex
    data map[string][]byte
}

func (s *ShardedStore) getShard(key string) *storeShard {
    hash := fnv.New64a()
    hash.Write([]byte(key))
    index := int(hash.Sum64() % uint64(len(s.shards)))
    return s.shards[index]
}

func (s *ShardedStore) Get(key string) []byte {
    shard := s.getShard(key)
    shard.mu.RLock()
    defer shard.mu.RUnlock()
    return shard.data[key]
}

Shard Count Selection

Too few shards (e.g., 2):
  • • Still high contention
  • • Not much parallelism
Too many shards (e.g., 10000):
  • • Memory overhead
  • • Cache unfriendly
  • • Initialization cost
Sweet spot: 2^N for good distribution
  • • 4 shards: 2M ops/sec
  • • 8 shards: 3M ops/sec
  • • 16 shards: 4M ops/sec (diminishing returns)
  • • 32 shards: 4.1M ops/sec

Lock Striping Techniques

Lock striping is a technique where multiple locks protect disjoint parts of data, improving concurrency.

// Hash-based Striping
type StripedMap struct {
    locks [16]sync.Mutex
    data  [16]map[string]interface{}
}

func (m *StripedMap) lockIndex(key string) int {
    h := hash(key)
    return int(h % 16)
}

func (m *StripedMap) Get(key string) interface{} {
    idx := m.lockIndex(key)
    m.locks[idx].Lock()
    defer m.locks[idx].Unlock()
    return m.data[idx][key]
}

Benefits of Lock Striping

Without striping (one lock):
  • • 100 goroutines → sequential
  • • Throughput: 100K ops/sec
With 16 lock stripes:
  • • 100 goroutines → distributed across 16 locks
  • • Average 6 goroutines per lock
  • • Throughput: 1.2M ops/sec (12x improvement)

Complete Thread-Safe Implementation

package store

import (
    "context"
    "errors"
    "hash/fnv"
    "sync"
)

var (
    ErrKeyNotFound = errors.New("key not found")
    ErrEmptyKey    = errors.New("key cannot be empty")
)

// Store interface
type Store interface {
    Get(ctx context.Context, key []byte) ([]byte, error)
    Put(ctx context.Context, key, value []byte) error
    Delete(ctx context.Context, key []byte) error
    Close() error
}

// ThreadSafeStore is a sharded, thread-safe key-value store
type ThreadSafeStore struct {
    shards []*storeShard
}

type storeShard struct {
    mu   sync.RWMutex
    data map[string][]byte
}

// NewThreadSafeStore creates a new thread-safe store
func NewThreadSafeStore(shardCount int) *ThreadSafeStore {
    if shardCount <= 0 {
        shardCount = 16  // Default
    }
    
    shards := make([]*storeShard, shardCount)
    for i := 0; i < shardCount; i++ {
        shards[i] = &storeShard{
            data: make(map[string][]byte),
        }
    }
    
    return &ThreadSafeStore{shards: shards}
}

// getShard returns the shard for a given key
func (s *ThreadSafeStore) getShard(key []byte) *storeShard {
    h := fnv.New64a()
    h.Write(key)
    index := int(h.Sum64() % uint64(len(s.shards)))
    return s.shards[index]
}

// Get retrieves a value by key
func (s *ThreadSafeStore) Get(ctx context.Context, key []byte) ([]byte, error) {
    if len(key) == 0 {
        return nil, ErrEmptyKey
    }
    
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    
    shard := s.getShard(key)
    shard.mu.RLock()
    defer shard.mu.RUnlock()
    
    val, exists := shard.data[string(key)]
    if !exists {
        return nil, ErrKeyNotFound
    }
    
    // Return copy to prevent external modification
    result := make([]byte, len(val))
    copy(result, val)
    return result, nil
}

// Put stores a value
func (s *ThreadSafeStore) Put(ctx context.Context, key, value []byte) error {
    if len(key) == 0 {
        return ErrEmptyKey
    }
    if value == nil {
        return errors.New("value cannot be nil")
    }
    
    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
    }
    
    shard := s.getShard(key)
    shard.mu.Lock()
    defer shard.mu.Unlock()
    
    // Store copy to prevent external modification
    valCopy := make([]byte, len(value))
    copy(valCopy, value)
    shard.data[string(key)] = valCopy
    
    return nil
}

// Delete removes a key
func (s *ThreadSafeStore) Delete(ctx context.Context, key []byte) error {
    if len(key) == 0 {
        return ErrEmptyKey
    }
    
    select {
    case <-ctx.Done():
        return ctx.Err()
    default:
    }
    
    shard := s.getShard(key)
    shard.mu.Lock()
    defer shard.mu.Unlock()
    
    delete(shard.data, string(key))
    return nil
}

// Close closes the store
func (s *ThreadSafeStore) Close() error {
    // For in-memory store, nothing to do
    return nil
}

Hands-on Lab: Add thread-safe operations and implement sharded storage

Build a complete thread-safe, sharded key-value store with concurrent access.

Lab Tasks:

  • 1. Implement sharding with FNV-1a hash function
  • 2. Add RWMutex for read-heavy operations
  • 3. Ensure no race conditions (verify with -race)
  • 4. Handle context cancellation properly
  • 5. Write comprehensive concurrent tests
  • 6. Benchmark different shard counts (1, 4, 8, 16)
  • 7. Measure throughput at different concurrency levels

Summary & Learning Outcomes

What You've Learned:

  • • How to identify and prevent race conditions
  • • Go's synchronization primitives (Mutex, RWMutex, atomic)
  • • Sharding strategies for high concurrency
  • • Lock striping techniques for better performance
  • • Thread-safe implementation patterns
  • • Deadlock prevention strategies

Key Takeaways

  • • Always use the race detector during development
  • • Choose the right locking strategy for your workload
  • • Sharding dramatically improves concurrent performance
  • • Atomic operations are much faster than mutexes for simple operations
  • • Context cancellation is essential for responsive systems