Course Navigation
← Back to Course OverviewAll Lessons
 ✓ 
 Introduction and Database Fundamentals  ✓ 
 Building the Core Data Structure  ✓ 
 Concurrency and Thread Safety  ✓ 
 Append-Only Log (Write-Ahead Log)  ✓ 
 SSTables and LSM Trees  ✓ 
 Compaction and Optimization  ✓ 
 TCP Server and Protocol Design  8 
 Client Library and Advanced Networking  9 
 Transactions and ACID Properties  10 
 Replication and High Availability  11 
 Monitoring, Metrics, and Observability  12 
 Performance Optimization and Tuning  13 
 Configuration and Deployment  14 
 Security and Production Hardening  15 
 Final Project and Beyond Current Lesson
  8 of 15 
  Progress 53% 
 Client Library and Advanced Networking
Learning Objectives
- • Master connection pooling for high-performance clients
- • Implement intelligent retry logic with exponential backoff
- • Build circuit breakers to prevent cascading failures
- • Use pipelining to achieve 10x throughput improvements
- • Handle network errors and timeouts gracefully
- • Create production-ready client libraries
Lesson 8.1: Client Implementation
Connection Management
A client needs to manage connections to the database server efficiently. Let's start with basic connection handling and evolve to production-grade pooling.
Basic Connection
// Simple client without pooling
type Client struct {
    conn net.Conn
    reader *bufio.Reader
    writer *bufio.Writer
}
func NewClient(addr string) (*Client, error) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    
    return &Client{
        conn: conn,
        reader: bufio.NewReader(conn),
        writer: bufio.NewWriter(conn),
    }, nil
}
func (c *Client) Get(key string) (string, error) {
    // Send GET command
    cmd := fmt.Sprintf("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", len(key), key)
    if _, err := c.writer.WriteString(cmd); err != nil {
        return "", err
    }
    if err := c.writer.Flush(); err != nil {
        return "", err
    }
    
    // Parse response
    marker, _ := c.reader.ReadByte()
    
    if marker == '-' {
        // Error response
        line, _ := bufio.NewReader(c.reader).ReadString('\n')
        return "", fmt.Errorf(line)
    }
    
    if marker == '$' {
        // Bulk string
        line, _ := bufio.NewReader(c.reader).ReadString('\n')
        length, _ := strconv.Atoi(strings.TrimSpace(line))
        
        if length == -1 {
            return "", nil  // Null
        }
        
        data := make([]byte, length)
        c.reader.Read(data)
        c.reader.ReadBytes('\n')  // Read trailing \r\n
        
        return string(data), nil
    }
    
    return "", fmt.Errorf("unexpected response type: %c", marker)
}
func (c *Client) Close() error {
    return c.conn.Close()
}Problem: One connection per client means:
- • Limited concurrency (connection blocks on read)
- • New connection overhead for each operation
- • No connection reuse
Connection Pooling
Connection Pool: Maintain multiple reusable connections
// Pool of connections to single server
type ConnPool struct {
    addr        string
    conns       chan net.Conn
    maxConns    int
    idleTimeout time.Duration
    closed      bool
    mu          sync.Mutex
}
func NewConnPool(addr string, maxConns int) *ConnPool {
    return &ConnPool{
        addr:        addr,
        conns:       make(chan net.Conn, maxConns),
        maxConns:    maxConns,
        idleTimeout: 5 * time.Minute,
    }
}
// Get retrieves connection from pool or creates new
func (cp *ConnPool) Get(ctx context.Context) (net.Conn, error) {
    select {
    case conn := <-cp.conns:
        // Connection available from pool
        
        // Test if still alive (optional)
        if err := cp.testConn(conn); err == nil {
            return conn, nil
        }
        conn.Close()
        // Fall through to create new
        
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    
    // Create new connection
    conn, err := net.Dial("tcp", cp.addr)
    if err != nil {
        return nil, err
    }
    
    return conn, nil
}
// Put returns connection to pool
func (cp *ConnPool) Put(conn net.Conn) error {
    cp.mu.Lock()
    if cp.closed {
        cp.mu.Unlock()
        return conn.Close()
    }
    cp.mu.Unlock()
    
    select {
    case cp.conns <- conn:
        return nil
    default:
        // Pool full, close connection
        return conn.Close()
    }
}
// testConn verifies connection is still alive
func (cp *ConnPool) testConn(conn net.Conn) error {
    // Set short timeout for test
    conn.SetDeadline(time.Now().Add(100 * time.Millisecond))
    defer conn.SetDeadline(time.Time)
    
    // Send PING (simple test)
    fmt.Fprintf(conn, "*1\r\n$4\r\nPING\r\n")
    
    // Read response
    reader := bufio.NewReader(conn)
    resp, err := reader.ReadString('\n')
    return err
}Pooled Client
type PooledClient struct {
    pool    *ConnPool
    timeout time.Duration
}
func NewPooledClient(addr string, maxConns int) *PooledClient {
    return &PooledClient{
        pool:    NewConnPool(addr, maxConns),
        timeout: 5 * time.Second,
    }
}
func (pc *PooledClient) Get(key string) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), pc.timeout)
    defer cancel()
    
    conn, err := pc.pool.Get(ctx)
    if err != nil {
        return "", err
    }
    defer pc.pool.Put(conn)
    
    // Send command and read response
    return pc.sendCommand(conn, "GET", key)
}
func (pc *PooledClient) Close() error {
    return pc.pool.Close()
}Retry Logic and Exponential Backoff
Problem: Transient failures (temporary network issues)
Attempt 1: Connection refused
Attempt 2 (wait 10ms): Connection refused
Attempt 3 (wait 100ms): Success!// RetryConfig configures retry behavior
type RetryConfig struct {
    MaxAttempts int
    InitialWait time.Duration
    MaxWait     time.Duration
}
// Retry executes operation with exponential backoff
func Retry(cfg RetryConfig, fn func() error) error {
    var lastErr error
    wait := cfg.InitialWait
    
    for attempt := 0; attempt < cfg.MaxAttempts; attempt++ {
        if attempt > 0 {
            time.Sleep(wait)
            // Exponential backoff: wait = wait * 2, capped at MaxWait
            if wait < cfg.MaxWait {
                wait *= 2
                if wait > cfg.MaxWait {
                    wait = cfg.MaxWait
                }
            }
        }
        
        err := fn()
        if err == nil {
            return nil
        }
        
        // Check if error is retryable
        if !isRetryable(err) {
            return err
        }
        
        lastErr = err
    }
    
    return fmt.Errorf("max retries exceeded: %w", lastErr)
}
// isRetryable determines if error should trigger retry
func isRetryable(err error) bool {
    if err == nil {
        return false
    }
    
    // Network errors are retryable
    if ne, ok := err.(net.Error); ok {
        return ne.Temporary() || ne.Timeout()
    }
    
    // Check error message
    errStr := err.Error()
    retryable := []string{
        "connection refused",
        "connection reset",
        "i/o timeout",
        "temporary failure",
    }
    
    for _, msg := range retryable {
        if strings.Contains(strings.ToLower(errStr), msg) {
            return true
        }
    }
    
    return false
}Circuit Breaker Pattern
Problem: Retry loop can overwhelm failing server
Server crashes:
  Client retries → Sends traffic to dead server
  More clients retry → Thundering herd
  Server recovers but overloadedSolution: Circuit Breaker - Stops sending requests when failures exceed threshold
// CircuitBreaker prevents cascading failures
type CircuitBreaker struct {
    maxFailures  int
    resetTimeout time.Duration
    
    failures     int
    lastFailTime time.Time
    state        CircuitState
    mu           sync.RWMutex
}
type CircuitState int
const (
    CircuitClosed CircuitState = iota  // Normal operation
    CircuitOpen                        // Too many failures
    CircuitHalfOpen                    // Testing recovery
)
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures:  maxFailures,
        resetTimeout: resetTimeout,
        state:        CircuitClosed,
    }
}
// Do executes operation, managing circuit state
func (cb *CircuitBreaker) Do(fn func() error) error {
    cb.mu.Lock()
    
    // Check if should reset from Open to HalfOpen
    if cb.state == CircuitOpen {
        if time.Since(cb.lastFailTime) > cb.resetTimeout {
            cb.state = CircuitHalfOpen
            cb.failures = 0
        } else {
            cb.mu.Unlock()
            return ErrCircuitOpen
        }
    }
    
    cb.mu.Unlock()
    
    // Execute operation
    err := fn()
    
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if err != nil {
        cb.failures++
        cb.lastFailTime = time.Now()
        
        if cb.failures >= cb.maxFailures {
            cb.state = CircuitOpen
            return fmt.Errorf("circuit open after %d failures: %w", cb.failures, err)
        }
        
        return err
    }
    
    // Success: reset circuit
    cb.failures = 0
    if cb.state == CircuitHalfOpen {
        cb.state = CircuitClosed
    }
    
    return nil
}Lesson 8.2: Advanced Networking Features
Pipelining for Throughput
Problem: Without pipelining:
Client sends command 1  →
                        ← Server sends response 1
Client sends command 2  →
                        ← Server sends response 2
Client sends command 3  →
                        ← Server sends response 3
Round-trip latency: 3 × 10ms = 30msSolution: Pipelining sends multiple commands before waiting for responses
Client sends commands 1,2,3 →
                             ← Server sends responses 1,2,3
Latency: 1 × 10ms = 10ms (3x faster!)// Pipeline sends multiple commands without waiting
type Pipeline struct {
    conn    net.Conn
    writer  *bufio.Writer
    reader  *bufio.Reader
    commands []*Command
}
type Command struct {
    Name string
    Args []string
    Response chan interface
    Error chan error
}
func NewPipeline(conn net.Conn) *Pipeline {
    return &Pipeline{
        conn:     conn,
        writer:   bufio.NewWriter(conn),
        reader:   bufio.NewReader(conn),
        commands: make([]*Command, 0),
    }
}
// Add queues command for execution
func (p *Pipeline) Add(name string, args ...string) <-chan interface {
    cmd := &Command{
        Name:     name,
        Args:     args,
        Response: make(chan interface, 1),
        Error:    make(chan error, 1),
    }
    
    p.commands = append(p.commands, cmd)
    return cmd.Response
}
// Execute sends all queued commands and reads responses
func (p *Pipeline) Execute(ctx context.Context) error {
    // Send all commands
    for _, cmd := range p.commands {
        req := p.formatCommand(cmd)
        if _, err := p.writer.WriteString(req); err != nil {
            return err
        }
    }
    
    if err := p.writer.Flush(); err != nil {
        return err
    }
    
    // Read all responses
    for _, cmd := range p.commands {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
        
        resp, err := p.readResponse()
        if err != nil {
            cmd.Error <- err
        } else {
            cmd.Response <- resp
        }
    }
    
    p.commands = p.commands[:0]  // Clear for next pipeline
    return nil
}Multiplexing Connections
Problem: Multiple goroutines on same connection conflicts
// BAD: Race condition
conn := shared connection
go func() {
    conn.Write(command1)
    conn.Read(response1)
}()
go func() {
    conn.Write(command2)
    conn.Read(response2)
}()
// Commands/responses may interleave!Solution: Multiplex with request ID matching
// Multiplexer allows concurrent operations on single connection
type Multiplexer struct {
    conn        net.Conn
    requestID   uint64
    responses   map[uint64]chan interface
    mu          sync.Mutex
}
// Request with unique ID
type MuxRequest struct {
    ID    uint64
    Name  string
    Args  []string
}
// Send sends request and returns channel for response
func (m *Multiplexer) Send(name string, args ...string) (<-chan interface, error) {
    m.mu.Lock()
    
    requestID := m.requestID
    m.requestID++
    
    respCh := make(chan interface, 1)
    m.responses[requestID] = respCh
    
    m.mu.Unlock()
    
    // Send request with ID
    req := fmt.Sprintf("%d %s %s\r\n", requestID, name, strings.Join(args, " "))
    _, err := m.conn.Write([]byte(req))
    if err != nil {
        m.mu.Lock()
        delete(m.responses, requestID)
        m.mu.Unlock()
        return nil, err
    }
    
    return respCh, nil
}Request Timeouts
// Timeout ensures requests don't hang forever
func (pc *PooledClient) GetWithTimeout(key string, timeout time.Duration) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    
    resultCh := make(chan string, 1)
    errCh := make(chan error, 1)
    
    go func() {
        val, err := pc.Get(key)
        if err != nil {
            errCh <- err
        } else {
            resultCh <- val
        }
    }()
    
    select {
    case result := <-resultCh:
        return result, nil
    case err := <-errCh:
        return "", err
    case <-ctx.Done():
        return "", fmt.Errorf("timeout exceeded")
    }
}Lesson 8.3: Error Handling
Network Error Types
import "errors"
import "net"
// Classify network errors
func classifyError(err error) string {
    if err == nil {
        return "none"
    }
    
    if errors.Is(err, io.EOF) {
        return "connection_closed"
    }
    
    if errors.Is(err, context.DeadlineExceeded) {
        return "timeout"
    }
    
    if ne, ok := err.(net.Error); ok {
        if ne.Timeout() {
            return "timeout"
        }
        if ne.Temporary() {
            return "temporary"
        }
        return "permanent"
    }
    
    return "unknown"
}
// Common network errors
var (
    ErrConnectionRefused = fmt.Errorf("connection refused")
    ErrConnectionReset   = fmt.Errorf("connection reset")
    ErrTimeout           = fmt.Errorf("operation timeout")
    ErrCircuitOpen       = fmt.Errorf("circuit breaker open")
)Partial Failure Handling
// Multi-key operation with partial failures
func (pc *PooledClient) MGet(keys ...string) (map[string]string, error) {
    results := make(map[string]string)
    var lastErr error
    
    for _, key := range keys {
        val, err := pc.Get(key)
        if err != nil {
            lastErr = err
            continue
        }
        results[key] = val
    }
    
    // Return partial results even if some failed
    return results, lastErr
}Client-Side Retries
// Transient vs permanent errors
func isTransient(err error) bool {
    if err == nil {
        return false
    }
    
    if ne, ok := err.(net.Error); ok {
        return ne.Temporary() || ne.Timeout()
    }
    
    // Check for specific errors
    errMsg := err.Error()
    transient := []string{
        "ECONNREFUSED",
        "ETIMEDOUT",
        "ECONNRESET",
        "temporary failure",
    }
    
    for _, msg := range transient {
        if strings.Contains(errMsg, msg) {
            return true
        }
    }
    
    return false
}
// Retry with backoff
func (pc *PooledClient) GetWithRetry(key string, maxRetries int) (string, error) {
    var lastErr error
    
    for attempt := 0; attempt < maxRetries; attempt++ {
        if attempt > 0 {
            backoff := time.Duration(math.Pow(2, float64(attempt-1))) * 100 * time.Millisecond
            time.Sleep(backoff)
        }
        
        val, err := pc.Get(key)
        if err == nil {
            return val, nil
        }
        
        if !isTransient(err) {
            return "", err  // Permanent error, don't retry
        }
        
        lastErr = err
    }
    
    return "", fmt.Errorf("max retries exceeded: %w", lastErr)
}Lab 8.1: Build Production-Ready Client Library
Objective
Implement a feature-complete, production-grade client library with pooling, retries, and advanced features.
Requirements
- • Connection Pooling: Configurable pool size, connection reuse, idle cleanup
- • Retry Logic: Exponential backoff, configurable max attempts, transient error detection
- • Circuit Breaker: Prevent cascading failures, automatic recovery, state tracking
- • Pipelining: Batch multiple commands, single flush to server, collect all responses
- • Error Handling: Proper error types, timeout handling, partial failure support
- • Testing: Test with real server, test error scenarios, test concurrent operations
Starter Code
package client
import (
    "bufio"
    "context"
    "net"
    "sync"
    "time"
)
// Client is a production-ready database client
type Client struct {
    // TODO: Add connection pool, circuit breaker, retry config
}
// NewClient creates a new client
// TODO: Implement with sensible defaults
func NewClient(addr string, opts ...Option) (*Client, error) {
    return nil, nil
}
// Get retrieves a value with automatic retry and timeout
// TODO: Implement with connection pooling
func (c *Client) Get(key string) (string, error) {
    return "", nil
}
// Set stores a value
// TODO: Implement
func (c *Client) Set(key, value string) error {
    return nil
}
// Delete removes a key
// TODO: Implement
func (c *Client) Delete(key string) error {
    return nil
}
// Pipeline returns a new pipeline for batching
// TODO: Implement
func (c *Client) Pipeline() *Pipeline {
    return nil
}
// Close gracefully closes the client
// TODO: Implement
func (c *Client) Close() error {
    return nil
}
// Option configures client behavior
type Option func(*Client)
// WithPoolSize sets connection pool size
func WithPoolSize(size int) Option {
    return func(c *Client) {
        // TODO: Implement
    }
}
// WithTimeout sets operation timeout
func WithTimeout(t time.Duration) Option {
    return func(c *Client) {
        // TODO: Implement
    }
}
// WithRetry configures retry behavior
func WithRetry(maxAttempts int, initialWait time.Duration) Option {
    return func(c *Client) {
        // TODO: Implement
    }
}Test Template
func TestClientPooling(t *testing.T) {
    // Start test server
    server := startTestServer()
    defer server.Stop()
    
    // Create client with pool size 5
    client := NewClient(server.Addr(), WithPoolSize(5))
    defer client.Close()
    
    // Concurrent operations
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(idx int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", idx)
            client.Set(key, "value")
            val, _ := client.Get(key)
            assert.Equal(t, "value", val)
        }(i)
    }
    
    wg.Wait()
}
func TestClientRetry(t *testing.T) {
    // Use flaky server that fails then succeeds
    server := startFlakyServer(failCount: 2)
    defer server.Stop()
    
    client := NewClient(server.Addr(), WithRetry(5, 10*time.Millisecond))
    
    // Should succeed after retries
    val, err := client.Get("key")
    assert.NoError(t, err)
}
func BenchmarkClientThroughput(b *testing.B) {
    server := startTestServer()
    defer server.Stop()
    
    client := NewClient(server.Addr())
    defer client.Close()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        client.Set("key", "value")
    }
}Acceptance Criteria
- ✅ All tests pass
- ✅ Connection pooling working
- ✅ Retries succeed with flaky servers
- ✅ Circuit breaker prevents cascading failures
- ✅ Pipelining achieves 10x throughput improvement
- ✅ 100K+ operations/sec on single client
- ✅ > 85% code coverage
- ✅ No goroutine leaks
Summary: Week 8 Complete
By completing Week 8, you've learned and implemented:
1. Connection Management
- • Simple connections
- • Connection pooling with reuse
- • Idle connection cleanup
- • Connection testing
2. Retry Logic
- • Exponential backoff
- • Transient error detection
- • Max retry limits
- • Backoff strategies
3. Circuit Breaker Pattern
- • Prevents cascading failures
- • State management
- • Automatic recovery
- • Failure threshold tracking
4. Advanced Features
- • Pipelining: 10x throughput
- • Multiplexing: Concurrent ops
- • Timeouts: Prevent hanging
- • Keep-alive: Detect dead conns
Metrics Achieved:
- ✅ 100K+ ops/sec with pooling
- ✅ 10x throughput with pipelining
- ✅ Sub-100ms failover with circuit breaker
- ✅ <1% error rate with retries
- ✅ Support for 1000+ concurrent clients
Module 4 Complete: Full Client-Server System
Congratulations! You now have a complete client-server system:
✅ Week 7: TCP Server
- • RESP protocol implementation
- • 10,000+ concurrent connections
- • Command execution framework
- • Graceful shutdown
✅ Week 8: Client Library
- • Connection pooling (5-10x throughput)
- • Intelligent retries (99%+ success rate)
- • Circuit breaking (prevents cascades)
- • Pipelining (10x throughput)
Ready for Module 5?
Next we'll implement advanced features including transactions with MVCC, replication, and high availability.
Continue to Module 5: Advanced Features →