Client Library and Advanced Networking

Learning Objectives

  • • Master connection pooling for high-performance clients
  • • Implement intelligent retry logic with exponential backoff
  • • Build circuit breakers to prevent cascading failures
  • • Use pipelining to achieve 10x throughput improvements
  • • Handle network errors and timeouts gracefully
  • • Create production-ready client libraries

Lesson 8.1: Client Implementation

Connection Management

A client needs to manage connections to the database server efficiently. Let's start with basic connection handling and evolve to production-grade pooling.

Basic Connection

// Simple client without pooling
type Client struct {
    conn net.Conn
    reader *bufio.Reader
    writer *bufio.Writer
}

func NewClient(addr string) (*Client, error) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    
    return &Client{
        conn: conn,
        reader: bufio.NewReader(conn),
        writer: bufio.NewWriter(conn),
    }, nil
}

func (c *Client) Get(key string) (string, error) {
    // Send GET command
    cmd := fmt.Sprintf("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", len(key), key)
    if _, err := c.writer.WriteString(cmd); err != nil {
        return "", err
    }
    if err := c.writer.Flush(); err != nil {
        return "", err
    }
    
    // Parse response
    marker, _ := c.reader.ReadByte()
    
    if marker == '-' {
        // Error response
        line, _ := bufio.NewReader(c.reader).ReadString('\n')
        return "", fmt.Errorf(line)
    }
    
    if marker == '$' {
        // Bulk string
        line, _ := bufio.NewReader(c.reader).ReadString('\n')
        length, _ := strconv.Atoi(strings.TrimSpace(line))
        
        if length == -1 {
            return "", nil  // Null
        }
        
        data := make([]byte, length)
        c.reader.Read(data)
        c.reader.ReadBytes('\n')  // Read trailing \r\n
        
        return string(data), nil
    }
    
    return "", fmt.Errorf("unexpected response type: %c", marker)
}

func (c *Client) Close() error {
    return c.conn.Close()
}

Problem: One connection per client means:

  • • Limited concurrency (connection blocks on read)
  • • New connection overhead for each operation
  • • No connection reuse

Connection Pooling

Connection Pool: Maintain multiple reusable connections

// Pool of connections to single server
type ConnPool struct {
    addr        string
    conns       chan net.Conn
    maxConns    int
    idleTimeout time.Duration
    closed      bool
    mu          sync.Mutex
}

func NewConnPool(addr string, maxConns int) *ConnPool {
    return &ConnPool{
        addr:        addr,
        conns:       make(chan net.Conn, maxConns),
        maxConns:    maxConns,
        idleTimeout: 5 * time.Minute,
    }
}

// Get retrieves connection from pool or creates new
func (cp *ConnPool) Get(ctx context.Context) (net.Conn, error) {
    select {
    case conn := <-cp.conns:
        // Connection available from pool
        
        // Test if still alive (optional)
        if err := cp.testConn(conn); err == nil {
            return conn, nil
        }
        conn.Close()
        // Fall through to create new
        
    case <-ctx.Done():
        return nil, ctx.Err()
    default:
    }
    
    // Create new connection
    conn, err := net.Dial("tcp", cp.addr)
    if err != nil {
        return nil, err
    }
    
    return conn, nil
}

// Put returns connection to pool
func (cp *ConnPool) Put(conn net.Conn) error {
    cp.mu.Lock()
    if cp.closed {
        cp.mu.Unlock()
        return conn.Close()
    }
    cp.mu.Unlock()
    
    select {
    case cp.conns <- conn:
        return nil
    default:
        // Pool full, close connection
        return conn.Close()
    }
}

// testConn verifies connection is still alive
func (cp *ConnPool) testConn(conn net.Conn) error {
    // Set short timeout for test
    conn.SetDeadline(time.Now().Add(100 * time.Millisecond))
    defer conn.SetDeadline(time.Time)
    
    // Send PING (simple test)
    fmt.Fprintf(conn, "*1\r\n$4\r\nPING\r\n")
    
    // Read response
    reader := bufio.NewReader(conn)
    resp, err := reader.ReadString('\n')
    return err
}

Pooled Client

type PooledClient struct {
    pool    *ConnPool
    timeout time.Duration
}

func NewPooledClient(addr string, maxConns int) *PooledClient {
    return &PooledClient{
        pool:    NewConnPool(addr, maxConns),
        timeout: 5 * time.Second,
    }
}

func (pc *PooledClient) Get(key string) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), pc.timeout)
    defer cancel()
    
    conn, err := pc.pool.Get(ctx)
    if err != nil {
        return "", err
    }
    defer pc.pool.Put(conn)
    
    // Send command and read response
    return pc.sendCommand(conn, "GET", key)
}

func (pc *PooledClient) Close() error {
    return pc.pool.Close()
}

Retry Logic and Exponential Backoff

Problem: Transient failures (temporary network issues)

Attempt 1: Connection refused
Attempt 2 (wait 10ms): Connection refused
Attempt 3 (wait 100ms): Success!
// RetryConfig configures retry behavior
type RetryConfig struct {
    MaxAttempts int
    InitialWait time.Duration
    MaxWait     time.Duration
}

// Retry executes operation with exponential backoff
func Retry(cfg RetryConfig, fn func() error) error {
    var lastErr error
    wait := cfg.InitialWait
    
    for attempt := 0; attempt < cfg.MaxAttempts; attempt++ {
        if attempt > 0 {
            time.Sleep(wait)
            // Exponential backoff: wait = wait * 2, capped at MaxWait
            if wait < cfg.MaxWait {
                wait *= 2
                if wait > cfg.MaxWait {
                    wait = cfg.MaxWait
                }
            }
        }
        
        err := fn()
        if err == nil {
            return nil
        }
        
        // Check if error is retryable
        if !isRetryable(err) {
            return err
        }
        
        lastErr = err
    }
    
    return fmt.Errorf("max retries exceeded: %w", lastErr)
}

// isRetryable determines if error should trigger retry
func isRetryable(err error) bool {
    if err == nil {
        return false
    }
    
    // Network errors are retryable
    if ne, ok := err.(net.Error); ok {
        return ne.Temporary() || ne.Timeout()
    }
    
    // Check error message
    errStr := err.Error()
    retryable := []string{
        "connection refused",
        "connection reset",
        "i/o timeout",
        "temporary failure",
    }
    
    for _, msg := range retryable {
        if strings.Contains(strings.ToLower(errStr), msg) {
            return true
        }
    }
    
    return false
}

Circuit Breaker Pattern

Problem: Retry loop can overwhelm failing server

Server crashes:
  Client retries → Sends traffic to dead server
  More clients retry → Thundering herd
  Server recovers but overloaded

Solution: Circuit Breaker - Stops sending requests when failures exceed threshold

// CircuitBreaker prevents cascading failures
type CircuitBreaker struct {
    maxFailures  int
    resetTimeout time.Duration
    
    failures     int
    lastFailTime time.Time
    state        CircuitState
    mu           sync.RWMutex
}

type CircuitState int

const (
    CircuitClosed CircuitState = iota  // Normal operation
    CircuitOpen                        // Too many failures
    CircuitHalfOpen                    // Testing recovery
)

func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
    return &CircuitBreaker{
        maxFailures:  maxFailures,
        resetTimeout: resetTimeout,
        state:        CircuitClosed,
    }
}

// Do executes operation, managing circuit state
func (cb *CircuitBreaker) Do(fn func() error) error {
    cb.mu.Lock()
    
    // Check if should reset from Open to HalfOpen
    if cb.state == CircuitOpen {
        if time.Since(cb.lastFailTime) > cb.resetTimeout {
            cb.state = CircuitHalfOpen
            cb.failures = 0
        } else {
            cb.mu.Unlock()
            return ErrCircuitOpen
        }
    }
    
    cb.mu.Unlock()
    
    // Execute operation
    err := fn()
    
    cb.mu.Lock()
    defer cb.mu.Unlock()
    
    if err != nil {
        cb.failures++
        cb.lastFailTime = time.Now()
        
        if cb.failures >= cb.maxFailures {
            cb.state = CircuitOpen
            return fmt.Errorf("circuit open after %d failures: %w", cb.failures, err)
        }
        
        return err
    }
    
    // Success: reset circuit
    cb.failures = 0
    if cb.state == CircuitHalfOpen {
        cb.state = CircuitClosed
    }
    
    return nil
}

Lesson 8.2: Advanced Networking Features

Pipelining for Throughput

Problem: Without pipelining:

Client sends command 1  →
                        ← Server sends response 1
Client sends command 2  →
                        ← Server sends response 2
Client sends command 3  →
                        ← Server sends response 3

Round-trip latency: 3 × 10ms = 30ms

Solution: Pipelining sends multiple commands before waiting for responses

Client sends commands 1,2,3 →
                             ← Server sends responses 1,2,3

Latency: 1 × 10ms = 10ms (3x faster!)
// Pipeline sends multiple commands without waiting
type Pipeline struct {
    conn    net.Conn
    writer  *bufio.Writer
    reader  *bufio.Reader
    commands []*Command
}

type Command struct {
    Name string
    Args []string
    Response chan interface
    Error chan error
}

func NewPipeline(conn net.Conn) *Pipeline {
    return &Pipeline{
        conn:     conn,
        writer:   bufio.NewWriter(conn),
        reader:   bufio.NewReader(conn),
        commands: make([]*Command, 0),
    }
}

// Add queues command for execution
func (p *Pipeline) Add(name string, args ...string) <-chan interface {
    cmd := &Command{
        Name:     name,
        Args:     args,
        Response: make(chan interface, 1),
        Error:    make(chan error, 1),
    }
    
    p.commands = append(p.commands, cmd)
    return cmd.Response
}

// Execute sends all queued commands and reads responses
func (p *Pipeline) Execute(ctx context.Context) error {
    // Send all commands
    for _, cmd := range p.commands {
        req := p.formatCommand(cmd)
        if _, err := p.writer.WriteString(req); err != nil {
            return err
        }
    }
    
    if err := p.writer.Flush(); err != nil {
        return err
    }
    
    // Read all responses
    for _, cmd := range p.commands {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
        
        resp, err := p.readResponse()
        if err != nil {
            cmd.Error <- err
        } else {
            cmd.Response <- resp
        }
    }
    
    p.commands = p.commands[:0]  // Clear for next pipeline
    return nil
}

Multiplexing Connections

Problem: Multiple goroutines on same connection conflicts

// BAD: Race condition
conn := shared connection
go func() {
    conn.Write(command1)
    conn.Read(response1)
}()
go func() {
    conn.Write(command2)
    conn.Read(response2)
}()
// Commands/responses may interleave!

Solution: Multiplex with request ID matching

// Multiplexer allows concurrent operations on single connection
type Multiplexer struct {
    conn        net.Conn
    requestID   uint64
    responses   map[uint64]chan interface
    mu          sync.Mutex
}

// Request with unique ID
type MuxRequest struct {
    ID    uint64
    Name  string
    Args  []string
}

// Send sends request and returns channel for response
func (m *Multiplexer) Send(name string, args ...string) (<-chan interface, error) {
    m.mu.Lock()
    
    requestID := m.requestID
    m.requestID++
    
    respCh := make(chan interface, 1)
    m.responses[requestID] = respCh
    
    m.mu.Unlock()
    
    // Send request with ID
    req := fmt.Sprintf("%d %s %s\r\n", requestID, name, strings.Join(args, " "))
    _, err := m.conn.Write([]byte(req))
    if err != nil {
        m.mu.Lock()
        delete(m.responses, requestID)
        m.mu.Unlock()
        return nil, err
    }
    
    return respCh, nil
}

Request Timeouts

// Timeout ensures requests don't hang forever
func (pc *PooledClient) GetWithTimeout(key string, timeout time.Duration) (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    
    resultCh := make(chan string, 1)
    errCh := make(chan error, 1)
    
    go func() {
        val, err := pc.Get(key)
        if err != nil {
            errCh <- err
        } else {
            resultCh <- val
        }
    }()
    
    select {
    case result := <-resultCh:
        return result, nil
    case err := <-errCh:
        return "", err
    case <-ctx.Done():
        return "", fmt.Errorf("timeout exceeded")
    }
}

Lesson 8.3: Error Handling

Network Error Types

import "errors"
import "net"

// Classify network errors
func classifyError(err error) string {
    if err == nil {
        return "none"
    }
    
    if errors.Is(err, io.EOF) {
        return "connection_closed"
    }
    
    if errors.Is(err, context.DeadlineExceeded) {
        return "timeout"
    }
    
    if ne, ok := err.(net.Error); ok {
        if ne.Timeout() {
            return "timeout"
        }
        if ne.Temporary() {
            return "temporary"
        }
        return "permanent"
    }
    
    return "unknown"
}

// Common network errors
var (
    ErrConnectionRefused = fmt.Errorf("connection refused")
    ErrConnectionReset   = fmt.Errorf("connection reset")
    ErrTimeout           = fmt.Errorf("operation timeout")
    ErrCircuitOpen       = fmt.Errorf("circuit breaker open")
)

Partial Failure Handling

// Multi-key operation with partial failures
func (pc *PooledClient) MGet(keys ...string) (map[string]string, error) {
    results := make(map[string]string)
    var lastErr error
    
    for _, key := range keys {
        val, err := pc.Get(key)
        if err != nil {
            lastErr = err
            continue
        }
        results[key] = val
    }
    
    // Return partial results even if some failed
    return results, lastErr
}

Client-Side Retries

// Transient vs permanent errors
func isTransient(err error) bool {
    if err == nil {
        return false
    }
    
    if ne, ok := err.(net.Error); ok {
        return ne.Temporary() || ne.Timeout()
    }
    
    // Check for specific errors
    errMsg := err.Error()
    transient := []string{
        "ECONNREFUSED",
        "ETIMEDOUT",
        "ECONNRESET",
        "temporary failure",
    }
    
    for _, msg := range transient {
        if strings.Contains(errMsg, msg) {
            return true
        }
    }
    
    return false
}

// Retry with backoff
func (pc *PooledClient) GetWithRetry(key string, maxRetries int) (string, error) {
    var lastErr error
    
    for attempt := 0; attempt < maxRetries; attempt++ {
        if attempt > 0 {
            backoff := time.Duration(math.Pow(2, float64(attempt-1))) * 100 * time.Millisecond
            time.Sleep(backoff)
        }
        
        val, err := pc.Get(key)
        if err == nil {
            return val, nil
        }
        
        if !isTransient(err) {
            return "", err  // Permanent error, don't retry
        }
        
        lastErr = err
    }
    
    return "", fmt.Errorf("max retries exceeded: %w", lastErr)
}

Lab 8.1: Build Production-Ready Client Library

Objective

Implement a feature-complete, production-grade client library with pooling, retries, and advanced features.

Requirements

  • • Connection Pooling: Configurable pool size, connection reuse, idle cleanup
  • • Retry Logic: Exponential backoff, configurable max attempts, transient error detection
  • • Circuit Breaker: Prevent cascading failures, automatic recovery, state tracking
  • • Pipelining: Batch multiple commands, single flush to server, collect all responses
  • • Error Handling: Proper error types, timeout handling, partial failure support
  • • Testing: Test with real server, test error scenarios, test concurrent operations

Starter Code

package client

import (
    "bufio"
    "context"
    "net"
    "sync"
    "time"
)

// Client is a production-ready database client
type Client struct {
    // TODO: Add connection pool, circuit breaker, retry config
}

// NewClient creates a new client
// TODO: Implement with sensible defaults
func NewClient(addr string, opts ...Option) (*Client, error) {
    return nil, nil
}

// Get retrieves a value with automatic retry and timeout
// TODO: Implement with connection pooling
func (c *Client) Get(key string) (string, error) {
    return "", nil
}

// Set stores a value
// TODO: Implement
func (c *Client) Set(key, value string) error {
    return nil
}

// Delete removes a key
// TODO: Implement
func (c *Client) Delete(key string) error {
    return nil
}

// Pipeline returns a new pipeline for batching
// TODO: Implement
func (c *Client) Pipeline() *Pipeline {
    return nil
}

// Close gracefully closes the client
// TODO: Implement
func (c *Client) Close() error {
    return nil
}

// Option configures client behavior
type Option func(*Client)

// WithPoolSize sets connection pool size
func WithPoolSize(size int) Option {
    return func(c *Client) {
        // TODO: Implement
    }
}

// WithTimeout sets operation timeout
func WithTimeout(t time.Duration) Option {
    return func(c *Client) {
        // TODO: Implement
    }
}

// WithRetry configures retry behavior
func WithRetry(maxAttempts int, initialWait time.Duration) Option {
    return func(c *Client) {
        // TODO: Implement
    }
}

Test Template

func TestClientPooling(t *testing.T) {
    // Start test server
    server := startTestServer()
    defer server.Stop()
    
    // Create client with pool size 5
    client := NewClient(server.Addr(), WithPoolSize(5))
    defer client.Close()
    
    // Concurrent operations
    var wg sync.WaitGroup
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(idx int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", idx)
            client.Set(key, "value")
            val, _ := client.Get(key)
            assert.Equal(t, "value", val)
        }(i)
    }
    
    wg.Wait()
}

func TestClientRetry(t *testing.T) {
    // Use flaky server that fails then succeeds
    server := startFlakyServer(failCount: 2)
    defer server.Stop()
    
    client := NewClient(server.Addr(), WithRetry(5, 10*time.Millisecond))
    
    // Should succeed after retries
    val, err := client.Get("key")
    assert.NoError(t, err)
}

func BenchmarkClientThroughput(b *testing.B) {
    server := startTestServer()
    defer server.Stop()
    
    client := NewClient(server.Addr())
    defer client.Close()
    
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        client.Set("key", "value")
    }
}

Acceptance Criteria

  • ✅ All tests pass
  • ✅ Connection pooling working
  • ✅ Retries succeed with flaky servers
  • ✅ Circuit breaker prevents cascading failures
  • ✅ Pipelining achieves 10x throughput improvement
  • ✅ 100K+ operations/sec on single client
  • ✅ > 85% code coverage
  • ✅ No goroutine leaks

Summary: Week 8 Complete

By completing Week 8, you've learned and implemented:

1. Connection Management

  • • Simple connections
  • • Connection pooling with reuse
  • • Idle connection cleanup
  • • Connection testing

2. Retry Logic

  • • Exponential backoff
  • • Transient error detection
  • • Max retry limits
  • • Backoff strategies

3. Circuit Breaker Pattern

  • • Prevents cascading failures
  • • State management
  • • Automatic recovery
  • • Failure threshold tracking

4. Advanced Features

  • • Pipelining: 10x throughput
  • • Multiplexing: Concurrent ops
  • • Timeouts: Prevent hanging
  • • Keep-alive: Detect dead conns

Metrics Achieved:

  • ✅ 100K+ ops/sec with pooling
  • ✅ 10x throughput with pipelining
  • ✅ Sub-100ms failover with circuit breaker
  • ✅ <1% error rate with retries
  • ✅ Support for 1000+ concurrent clients

Module 4 Complete: Full Client-Server System

Congratulations! You now have a complete client-server system:

✅ Week 7: TCP Server

  • • RESP protocol implementation
  • • 10,000+ concurrent connections
  • • Command execution framework
  • • Graceful shutdown

✅ Week 8: Client Library

  • • Connection pooling (5-10x throughput)
  • • Intelligent retries (99%+ success rate)
  • • Circuit breaking (prevents cascades)
  • • Pipelining (10x throughput)

Ready for Module 5?

Next we'll implement advanced features including transactions with MVCC, replication, and high availability.

Continue to Module 5: Advanced Features →