Lesson 8
Client library & networking
Ergonomic clients, retries, and backpressure.
Course Navigation
Back to courseLearning Objectives
-
Master connection pooling for high-performance clients
-
Implement intelligent retry logic with exponential backoff
-
Build circuit breakers to prevent cascading failures
-
Use pipelining to achieve 10x throughput improvements
-
Handle network errors and timeouts gracefully
-
Create production-ready client libraries
Lesson 8.1: Client Implementation
Connection Management
A client needs to manage connections to the database server efficiently. Let’s start with basic connection handling and evolve to production-grade pooling.
Basic Connection
// Simple client without pooling
type Client struct {
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
}
func NewClient(addr string) (*Client, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
return &Client{
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
}, nil
}
func (c *Client) Get(key string) (string, error) {
// Send GET command
cmd := fmt.Sprintf("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", len(key), key)
if _, err := c.writer.WriteString(cmd); err != nil {
return "", err
}
if err := c.writer.Flush(); err != nil {
return "", err
}
// Parse response
marker, _ := c.reader.ReadByte()
if marker == '-' {
// Error response
line, _ := bufio.NewReader(c.reader).ReadString('\n')
return "", fmt.Errorf(line)
}
if marker == '$' {
// Bulk string
line, _ := bufio.NewReader(c.reader).ReadString('\n')
length, _ := strconv.Atoi(strings.TrimSpace(line))
if length == -1 {
return "", nil // Null
}
data := make([]byte, length)
c.reader.Read(data)
c.reader.ReadBytes('\n') // Read trailing \r\n
return string(data), nil
}
return "", fmt.Errorf("unexpected response type: %c", marker)
}
func (c *Client) Close() error {
return c.conn.Close()
}
Problem: One connection per client means:
-
Limited concurrency (connection blocks on read)
-
New connection overhead for each operation
-
No connection reuse
Connection Pooling
Connection Pool: Maintain multiple reusable connections
// Pool of connections to single server
type ConnPool struct {
addr string
conns chan net.Conn
maxConns int
idleTimeout time.Duration
closed bool
mu sync.Mutex
}
func NewConnPool(addr string, maxConns int) *ConnPool {
return &ConnPool{
addr: addr,
conns: make(chan net.Conn, maxConns),
maxConns: maxConns,
idleTimeout: 5 * time.Minute,
}
}
// Get retrieves connection from pool or creates new
func (cp *ConnPool) Get(ctx context.Context) (net.Conn, error) {
select {
case conn := <-cp.conns:
// Connection available from pool
// Test if still alive (optional)
if err := cp.testConn(conn); err == nil {
return conn, nil
}
conn.Close()
// Fall through to create new
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Create new connection
conn, err := net.Dial("tcp", cp.addr)
if err != nil {
return nil, err
}
return conn, nil
}
// Put returns connection to pool
func (cp *ConnPool) Put(conn net.Conn) error {
cp.mu.Lock()
if cp.closed {
cp.mu.Unlock()
return conn.Close()
}
cp.mu.Unlock()
select {
case cp.conns <- conn:
return nil
default:
// Pool full, close connection
return conn.Close()
}
}
// testConn verifies connection is still alive
func (cp *ConnPool) testConn(conn net.Conn) error {
// Set short timeout for test
conn.SetDeadline(time.Now().Add(100 * time.Millisecond))
defer conn.SetDeadline(time.Time{})
// Send PING (simple test)
fmt.Fprintf(conn, "*1\r\n$4\r\nPING\r\n")
// Read response
reader := bufio.NewReader(conn)
resp, err := reader.ReadString('\n')
return err
}
Pooled Client
type PooledClient struct {
pool *ConnPool
timeout time.Duration
}
func NewPooledClient(addr string, maxConns int) *PooledClient {
return &PooledClient{
pool: NewConnPool(addr, maxConns),
timeout: 5 * time.Second,
}
}
func (pc *PooledClient) Get(key string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), pc.timeout)
defer cancel()
conn, err := pc.pool.Get(ctx)
if err != nil {
return "", err
}
defer pc.pool.Put(conn)
// Send command and read response
return pc.sendCommand(conn, "GET", key)
}
func (pc *PooledClient) Close() error {
return pc.pool.Close()
}
Retry Logic and Exponential Backoff
Problem: Transient failures (temporary network issues)
Attempt 1: Connection refused
Attempt 2 (wait 10ms): Connection refused
Attempt 3 (wait 100ms): Success!
// RetryConfig configures retry behavior
type RetryConfig struct {
MaxAttempts int
InitialWait time.Duration
MaxWait time.Duration
}
// Retry executes operation with exponential backoff
func Retry(cfg RetryConfig, fn func() error) error {
var lastErr error
wait := cfg.InitialWait
for attempt := 0; attempt < cfg.MaxAttempts; attempt++ {
if attempt > 0 {
time.Sleep(wait)
// Exponential backoff: wait = wait * 2, capped at MaxWait
if wait < cfg.MaxWait {
wait *= 2
if wait > cfg.MaxWait {
wait = cfg.MaxWait
}
}
}
err := fn()
if err == nil {
return nil
}
// Check if error is retryable
if !isRetryable(err) {
return err
}
lastErr = err
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
// isRetryable determines if error should trigger retry
func isRetryable(err error) bool {
if err == nil {
return false
}
// Network errors are retryable
if ne, ok := err.(net.Error); ok {
return ne.Temporary() || ne.Timeout()
}
// Check error message
errStr := err.Error()
retryable := []string{
"connection refused",
"connection reset",
"i/o timeout",
"temporary failure",
}
for _, msg := range retryable {
if strings.Contains(strings.ToLower(errStr), msg) {
return true
}
}
return false
}
Circuit Breaker Pattern
Problem: Retry loop can overwhelm failing server
Server crashes:
Client retries → Sends traffic to dead server
More clients retry → Thundering herd
Server recovers but overloaded
Solution: Circuit Breaker - Stops sending requests when failures exceed threshold
// CircuitBreaker prevents cascading failures
type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration
failures int
lastFailTime time.Time
state CircuitState
mu sync.RWMutex
}
type CircuitState int
const (
CircuitClosed CircuitState = iota // Normal operation
CircuitOpen // Too many failures
CircuitHalfOpen // Testing recovery
)
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
state: CircuitClosed,
}
}
// Do executes operation, managing circuit state
func (cb *CircuitBreaker) Do(fn func() error) error {
cb.mu.Lock()
// Check if should reset from Open to HalfOpen
if cb.state == CircuitOpen {
if time.Since(cb.lastFailTime) > cb.resetTimeout {
cb.state = CircuitHalfOpen
cb.failures = 0
} else {
cb.mu.Unlock()
return ErrCircuitOpen
}
}
cb.mu.Unlock()
// Execute operation
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = CircuitOpen
return fmt.Errorf("circuit open after %d failures: %w", cb.failures, err)
}
return err
}
// Success: reset circuit
cb.failures = 0
if cb.state == CircuitHalfOpen {
cb.state = CircuitClosed
}
return nil
}
Lesson 8.2: Advanced Networking Features
Pipelining for Throughput
Problem: Without pipelining:
Client sends command 1 →
← Server sends response 1
Client sends command 2 →
← Server sends response 2
Client sends command 3 →
← Server sends response 3
Round-trip latency: 3 × 10ms = 30ms
Solution: Pipelining sends multiple commands before waiting for responses
Client sends commands 1,2,3 →
← Server sends responses 1,2,3
Latency: 1 × 10ms = 10ms (3x faster!)
// Pipeline sends multiple commands without waiting
type Pipeline struct {
conn net.Conn
writer *bufio.Writer
reader *bufio.Reader
commands []*Command
}
type Command struct {
Name string
Args []string
Response chan interface{}
Error chan error
}
func NewPipeline(conn net.Conn) *Pipeline {
return &Pipeline{
conn: conn,
writer: bufio.NewWriter(conn),
reader: bufio.NewReader(conn),
commands: make([]*Command, 0),
}
}
// Add queues command for execution
func (p *Pipeline) Add(name string, args ...string) <-chan interface{} {
cmd := &Command{
Name: name,
Args: args,
Response: make(chan interface{}, 1),
Error: make(chan error, 1),
}
p.commands = append(p.commands, cmd)
return cmd.Response
}
// Execute sends all queued commands and reads responses
func (p *Pipeline) Execute(ctx context.Context) error {
// Send all commands
for _, cmd := range p.commands {
req := p.formatCommand(cmd)
if _, err := p.writer.WriteString(req); err != nil {
return err
}
}
if err := p.writer.Flush(); err != nil {
return err
}
// Read all responses
for _, cmd := range p.commands {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
resp, err := p.readResponse()
if err != nil {
cmd.Error <- err
} else {
cmd.Response <- resp
}
}
p.commands = p.commands[:0] // Clear for next pipeline
return nil
}
Multiplexing Connections
Problem: Multiple goroutines on same connection conflicts
// BAD: Race condition
conn := shared connection
go func() {
conn.Write(command1)
conn.Read(response1)
}()
go func() {
conn.Write(command2)
conn.Read(response2)
}()
// Commands/responses may interleave!
Solution: Multiplex with request ID matching
// Multiplexer allows concurrent operations on single connection
type Multiplexer struct {
conn net.Conn
requestID uint64
responses map[uint64]chan interface{}
mu sync.Mutex
}
// Request with unique ID
type MuxRequest struct {
ID uint64
Name string
Args []string
}
// Send sends request and returns channel for response
func (m *Multiplexer) Send(name string, args ...string) (<-chan interface{}, error) {
m.mu.Lock()
requestID := m.requestID
m.requestID++
respCh := make(chan interface{}, 1)
m.responses[requestID] = respCh
m.mu.Unlock()
// Send request with ID
req := fmt.Sprintf("%d %s %s\r\n", requestID, name, strings.Join(args, " "))
_, err := m.conn.Write([]byte(req))
if err != nil {
m.mu.Lock()
delete(m.responses, requestID)
m.mu.Unlock()
return nil, err
}
return respCh, nil
}
Request Timeouts
// Timeout ensures requests don't hang forever
func (pc *PooledClient) GetWithTimeout(key string, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
resultCh := make(chan string, 1)
errCh := make(chan error, 1)
go func() {
val, err := pc.Get(key)
if err != nil {
errCh <- err
} else {
resultCh <- val
}
}()
select {
case result := <-resultCh:
return result, nil
case err := <-errCh:
return "", err
case <-ctx.Done():
return "", fmt.Errorf("timeout exceeded")
}
}
Lesson 8.3: Error Handling
Network Error Types
import "errors"
import "net"
// Classify network errors
func classifyError(err error) string {
if err == nil {
return "none"
}
if errors.Is(err, io.EOF) {
return "connection_closed"
}
if errors.Is(err, context.DeadlineExceeded) {
return "timeout"
}
if ne, ok := err.(net.Error); ok {
if ne.Timeout() {
return "timeout"
}
if ne.Temporary() {
return "temporary"
}
return "permanent"
}
return "unknown"
}
// Common network errors
var (
ErrConnectionRefused = fmt.Errorf("connection refused")
ErrConnectionReset = fmt.Errorf("connection reset")
ErrTimeout = fmt.Errorf("operation timeout")
ErrCircuitOpen = fmt.Errorf("circuit breaker open")
)
Partial Failure Handling
// Multi-key operation with partial failures
func (pc *PooledClient) MGet(keys ...string) (map[string]string, error) {
results := make(map[string]string)
var lastErr error
for _, key := range keys {
val, err := pc.Get(key)
if err != nil {
lastErr = err
continue
}
results[key] = val
}
// Return partial results even if some failed
return results, lastErr
}
Client-Side Retries
// Transient vs permanent errors
func isTransient(err error) bool {
if err == nil {
return false
}
if ne, ok := err.(net.Error); ok {
return ne.Temporary() || ne.Timeout()
}
// Check for specific errors
errMsg := err.Error()
transient := []string{
"ECONNREFUSED",
"ETIMEDOUT",
"ECONNRESET",
"temporary failure",
}
for _, msg := range transient {
if strings.Contains(errMsg, msg) {
return true
}
}
return false
}
// Retry with backoff
func (pc *PooledClient) GetWithRetry(key string, maxRetries int) (string, error) {
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * 100 * time.Millisecond
time.Sleep(backoff)
}
val, err := pc.Get(key)
if err == nil {
return val, nil
}
if !isTransient(err) {
return "", err // Permanent error, don't retry
}
lastErr = err
}
return "", fmt.Errorf("max retries exceeded: %w", lastErr)
}
Lab 8.1: Build Production-Ready Client Library
Objective
Implement a feature-complete, production-grade client library with pooling, retries, and advanced features.
Requirements
-
Connection Pooling: Configurable pool size, connection reuse, idle cleanup
-
Retry Logic: Exponential backoff, configurable max attempts, transient error detection
-
Circuit Breaker: Prevent cascading failures, automatic recovery, state tracking
-
Pipelining: Batch multiple commands, single flush to server, collect all responses
-
Error Handling: Proper error types, timeout handling, partial failure support
-
Testing: Test with real server, test error scenarios, test concurrent operations
Starter Code
package client
import (
"bufio"
"context"
"net"
"sync"
"time"
)
// Client is a production-ready database client
type Client struct {
// TODO: Add connection pool, circuit breaker, retry config
}
// NewClient creates a new client
// TODO: Implement with sensible defaults
func NewClient(addr string, opts ...Option) (*Client, error) {
return nil, nil
}
// Get retrieves a value with automatic retry and timeout
// TODO: Implement with connection pooling
func (c *Client) Get(key string) (string, error) {
return "", nil
}
// Set stores a value
// TODO: Implement
func (c *Client) Set(key, value string) error {
return nil
}
// Delete removes a key
// TODO: Implement
func (c *Client) Delete(key string) error {
return nil
}
// Pipeline returns a new pipeline for batching
// TODO: Implement
func (c *Client) Pipeline() *Pipeline {
return nil
}
// Close gracefully closes the client
// TODO: Implement
func (c *Client) Close() error {
return nil
}
// Option configures client behavior
type Option func(*Client)
// WithPoolSize sets connection pool size
func WithPoolSize(size int) Option {
return func(c *Client) {
// TODO: Implement
}
}
// WithTimeout sets operation timeout
func WithTimeout(t time.Duration) Option {
return func(c *Client) {
// TODO: Implement
}
}
// WithRetry configures retry behavior
func WithRetry(maxAttempts int, initialWait time.Duration) Option {
return func(c *Client) {
// TODO: Implement
}
}
Test Template
func TestClientPooling(t *testing.T) {
// Start test server
server := startTestServer()
defer server.Stop()
// Create client with pool size 5
client := NewClient(server.Addr(), WithPoolSize(5))
defer client.Close()
// Concurrent operations
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", idx)
client.Set(key, "value")
val, _ := client.Get(key)
assert.Equal(t, "value", val)
}(i)
}
wg.Wait()
}
func TestClientRetry(t *testing.T) {
// Use flaky server that fails then succeeds
server := startFlakyServer(failCount: 2)
defer server.Stop()
client := NewClient(server.Addr(), WithRetry(5, 10*time.Millisecond))
// Should succeed after retries
val, err := client.Get("key")
assert.NoError(t, err)
}
func BenchmarkClientThroughput(b *testing.B) {
server := startTestServer()
defer server.Stop()
client := NewClient(server.Addr())
defer client.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
client.Set("key", "value")
}
}
Acceptance Criteria
-
✅ All tests pass
-
✅ Connection pooling working
-
✅ Retries succeed with flaky servers
-
✅ Circuit breaker prevents cascading failures
-
✅ Pipelining achieves 10x throughput improvement
-
✅ 100K+ operations/sec on single client
-
✅ > 85% code coverage
-
✅ No goroutine leaks
Summary: Week 8 Complete
By completing Week 8, you’ve learned and implemented:
1. Connection Management
-
Simple connections
-
Connection pooling with reuse
-
Idle connection cleanup
-
Connection testing
2. Retry Logic
-
Exponential backoff
-
Transient error detection
-
Max retry limits
-
Backoff strategies
3. Circuit Breaker Pattern
-
Prevents cascading failures
-
State management
-
Automatic recovery
-
Failure threshold tracking
4. Advanced Features
-
Pipelining: 10x throughput
-
Multiplexing: Concurrent ops
-
Timeouts: Prevent hanging
-
Keep-alive: Detect dead conns
Metrics Achieved:
-
✅ 100K+ ops/sec with pooling
-
✅ 10x throughput with pipelining
-
✅ Sub-100ms failover with circuit breaker
-
✅ <1% error rate with retries
-
✅ Support for 1000+ concurrent clients
Module 4 Complete: Full Client-Server System
Congratulations! You now have a complete client-server system:
✅ Week 7: TCP Server
-
RESP protocol implementation
-
10,000+ concurrent connections
-
Command execution framework
-
Graceful shutdown
✅ Week 8: Client Library
-
Connection pooling (5-10x throughput)
-
Intelligent retries (99%+ success rate)
-
Circuit breaking (prevents cascades)
-
Pipelining (10x throughput)
Ready for Module 5?
Next we’ll implement advanced features including transactions with MVCC, replication, and high availability.
Continue to Module 5: Advanced Features →