You are viewing a preview of this lesson. Sign in to start learning
Back to Mastering AWS

Caching & Performance

ElastiCache Redis and Memcached, DAX for DynamoDB, and caching strategies

AWS Caching & Performance Optimization

Master AWS caching strategies with free flashcards and spaced repetition practice. This lesson covers Amazon CloudFront, ElastiCache, DAX (DynamoDB Accelerator), and performance optimization patternsβ€”essential concepts for building high-performance, cost-effective cloud applications.

Welcome to AWS Caching & Performance πŸš€

Caching is one of the most powerful tools in your AWS arsenal for improving application performance and reducing costs. By storing frequently accessed data closer to your users or application layer, you can dramatically reduce latency, decrease database load, and improve the overall user experience. In this comprehensive lesson, you'll learn how to implement multiple layers of caching using AWS services like CloudFront, ElastiCache (Redis and Memcached), and DynamoDB Accelerator (DAX).

Whether you're preparing for AWS certification exams or building production systems, understanding when and how to use each caching service is crucial for architecting scalable, performant applications.

Core Concepts: The AWS Caching Hierarchy 🎯

Understanding the Caching Pyramid

AWS provides caching capabilities at multiple layers of your application stack. Think of it like a pyramidβ€”the closer data is to the user, the faster the response time:

         πŸ‘€ User
          |
     β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
     β”‚ CloudFront β”‚  ← Edge caching (global)
     β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
          |
     β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
     β”‚   ALB    β”‚  ← Load balancer
     β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
          |
     β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
     β”‚ ElastiCache β”‚ ← In-memory caching
     β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
          |
     β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
     β”‚   DAX    β”‚  ← DynamoDB acceleration
     β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
          |
     β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
     β”‚ Database β”‚  ← Primary data store
     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1. Amazon CloudFront: Edge Caching 🌍

CloudFront is AWS's Content Delivery Network (CDN) that caches content at edge locations worldwide. It's your first line of defense against slow load times for global users.

Key Features:

  • Edge Locations: 400+ points of presence globally
  • Origin Types: S3, EC2, ALB, API Gateway, custom origins
  • Cache Behaviors: Different rules for different URL patterns
  • TTL Control: Time-to-Live settings (seconds to 1 year)
  • Invalidation: Force cache refresh when needed

πŸ’‘ Use CloudFront when:

  • Serving static content (images, CSS, JavaScript, videos)
  • Distributing APIs globally
  • Reducing latency for international users
  • Protecting origins from DDoS attacks (with AWS Shield)

How CloudFront Works:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚         CLOUDFRONT REQUEST FLOW             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1. User requests content
        ↓
2. DNS routes to nearest edge location
        ↓
3. Edge location checks cache
        ↓
   β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
   ↓         ↓
 Cache     Cache
  HIT      MISS
   β”‚         β”‚
   β”‚         ↓
   β”‚    Request from origin
   β”‚         β”‚
   β”‚         ↓
   β”‚    Cache response
   β”‚         β”‚
   β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
        ↓
4. Return content to user

Cache-Control Headers:

CloudFront respects HTTP cache headers from your origin:

## Python Flask example
from flask import Response

@app.route('/api/products')
def get_products():
    data = get_product_data()
    response = Response(json.dumps(data))
    # Cache for 5 minutes at CloudFront
    response.headers['Cache-Control'] = 'max-age=300, public'
    return response

Cache Behavior Patterns:

PatternUse CaseTTL
/images/*Static images1 year
/api/*API responses5-60 seconds
/videos/*Video streaming1 day
/*.htmlWeb pages5 minutes

🧠 Memory Device: "CloudFront = Cloud in FRONT of your origin" - It sits between users and your servers, intercepting requests.

2. Amazon ElastiCache: In-Memory Caching πŸ’Ύ

ElastiCache provides managed Redis and Memcached clusters for blazing-fast in-memory data access. It sits between your application and database, dramatically reducing database load.

Redis vs Memcached Comparison:

FeatureRedisMemcached
Data StructuresStrings, Lists, Sets, Hashes, Sorted SetsStrings only
PersistenceOptional (RDB, AOF)None
ReplicationMaster-replicaNone
TransactionsYesNo
Pub/SubYesNo
Multi-threadingNo (single-threaded)Yes
Use CaseComplex caching, sessions, leaderboardsSimple object caching

πŸ’‘ Choose Redis when:

  • You need data persistence
  • Working with complex data types
  • Implementing leaderboards, counters, or real-time analytics
  • Need pub/sub messaging
  • Require automatic failover

πŸ’‘ Choose Memcached when:

  • Simple key-value caching
  • Need multi-threaded performance
  • Don't require persistence
  • Horizontal scaling with multiple nodes

Redis Data Structure Examples:

## Python with redis-py
import redis
import json

## Connect to ElastiCache Redis
cache = redis.Redis(
    host='my-cluster.cache.amazonaws.com',
    port=6379,
    decode_responses=True
)

## String caching (most common)
def get_user(user_id):
    cache_key = f"user:{user_id}"
    
    # Try cache first
    cached = cache.get(cache_key)
    if cached:
        return json.loads(cached)
    
    # Cache miss - query database
    user = db.query_user(user_id)
    
    # Store in cache for 1 hour
    cache.setex(cache_key, 3600, json.dumps(user))
    return user

## Hash for structured data
cache.hset("user:123", mapping={
    "name": "Alice",
    "email": "alice@example.com",
    "age": "30"
})

## Sorted set for leaderboards
cache.zadd("game:leaderboard", {
    "player1": 1500,
    "player2": 2300,
    "player3": 1800
})

## Get top 10 players
top_players = cache.zrevrange("game:leaderboard", 0, 9, withscores=True)

Cache-Aside Pattern (Lazy Loading):

This is the most common caching pattern:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚     CACHE-ASIDE PATTERN              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

  Application requests data
          ↓
    Check cache
          ↓
     β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”
     ↓         ↓
   Found    Not Found
  (Cache    (Cache
   Hit)      Miss)
     β”‚         β”‚
     β”‚         ↓
     β”‚    Query database
     β”‚         β”‚
     β”‚         ↓
     β”‚    Write to cache
     β”‚         β”‚
     β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
          ↓
    Return data

Write-Through Pattern:

def update_user(user_id, data):
    # Write to database first
    db.update_user(user_id, data)
    
    # Then update cache
    cache_key = f"user:{user_id}"
    cache.setex(cache_key, 3600, json.dumps(data))
    
    return data

⚠️ Common Cache Invalidation Strategies:

  1. TTL-based: Set expiration times
  2. Event-based: Invalidate on data changes
  3. Version-based: Include version in cache key
  4. Manual: Explicit cache clearing
## Version-based cache keys
def get_product(product_id, version):
    cache_key = f"product:{product_id}:v{version}"
    # Old versions naturally expire via TTL
    return cache.get(cache_key)

3. DynamoDB Accelerator (DAX): Microsecond Latency ⚑

DAX is an in-memory cache specifically designed for DynamoDB, providing microsecond response times for eventually-consistent reads.

Key Characteristics:

  • Latency: Single-digit milliseconds β†’ microseconds
  • API Compatibility: Drop-in replacement for DynamoDB SDK
  • Write-through: Writes go through DAX to DynamoDB
  • Item Cache: Individual items (TTL: 5 minutes default)
  • Query Cache: Query and Scan results (TTL: 5 minutes default)

DAX Architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚          DAX CLUSTER                   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚Primary  β”‚  β”‚Replica  β”‚  β”‚Replica β”‚β”‚
β”‚  β”‚  Node   β”‚  β”‚  Node   β”‚  β”‚  Node  β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”˜β”‚
β”‚       β”‚            β”‚            β”‚    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”˜
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                 ↓
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚   DynamoDB     β”‚
        β”‚     Table      β”‚
        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Using DAX with Python:

import boto3
from amazondax import AmazonDaxClient

## Without DAX (standard DynamoDB)
dynamodb = boto3.client('dynamodb', region_name='us-east-1')

## With DAX (same API!)
dax = AmazonDaxClient(endpoint_url='my-dax-cluster.cache.amazonaws.com:8111')

## Query with DAX caching
response = dax.query(
    TableName='Products',
    KeyConditionExpression='category = :cat',
    ExpressionAttributeValues={
        ':cat': {'S': 'electronics'}
    }
)

## GetItem with DAX
item = dax.get_item(
    TableName='Users',
    Key={'userId': {'S': 'user123'}}
)

πŸ’‘ Use DAX when:

  • Read-heavy DynamoDB workloads
  • Need sub-millisecond latency
  • Eventually-consistent reads are acceptable
  • Repeated reads of same items
  • Want transparent caching (no code changes)

⚠️ Don't use DAX for:

  • Strongly-consistent reads (not supported)
  • Write-heavy workloads (no write caching benefit)
  • Infrequent, varied queries
  • Batch operations (limited benefit)

DAX vs ElastiCache for DynamoDB:

AspectDAXElastiCache
IntegrationDrop-in DynamoDB replacementRequires code changes
LatencyMicrosecondsSub-millisecond
FlexibilityDynamoDB onlyAny data source
Cache LogicAutomaticManual control
ComplexityLowerHigher

4. Application-Level Caching Strategies πŸ”§

Multi-Tier Caching Architecture:

class MultiTierCache:
    def __init__(self):
        self.local_cache = {}  # In-process cache
        self.redis = redis.Redis(host='elasticache-endpoint')
        self.dynamodb = boto3.resource('dynamodb')
    
    def get_product(self, product_id):
        # L1: Local cache (fastest)
        if product_id in self.local_cache:
            return self.local_cache[product_id]
        
        # L2: Redis cache
        cache_key = f"product:{product_id}"
        cached = self.redis.get(cache_key)
        if cached:
            product = json.loads(cached)
            self.local_cache[product_id] = product  # Populate L1
            return product
        
        # L3: Database (slowest)
        table = self.dynamodb.Table('Products')
        response = table.get_item(Key={'productId': product_id})
        product = response.get('Item')
        
        if product:
            # Populate both caches
            self.redis.setex(cache_key, 300, json.dumps(product))
            self.local_cache[product_id] = product
        
        return product

Cache Warming:

Pre-populate cache before traffic arrives:

import boto3
import redis

def warm_cache():
    """Run during deployment or scheduled maintenance"""
    cache = redis.Redis(host='elasticache-endpoint')
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('Products')
    
    # Get most popular products
    response = table.scan(
        FilterExpression='popularity > :threshold',
        ExpressionAttributeValues={':threshold': 1000}
    )
    
    # Load into cache
    for item in response['Items']:
        cache_key = f"product:{item['productId']}"
        cache.setex(cache_key, 3600, json.dumps(item))
    
    print(f"Warmed {len(response['Items'])} items")

Cache Stampede Prevention:

When cache expires, multiple requests hit database simultaneously. Solution:

import time
import threading

class CacheWithLock:
    def __init__(self, redis_client):
        self.cache = redis_client
        self.locks = {}
    
    def get_with_lock(self, key, fetch_func, ttl=300):
        # Try cache
        cached = self.cache.get(key)
        if cached:
            return json.loads(cached)
        
        # Acquire lock for this key
        lock_key = f"lock:{key}"
        lock_acquired = self.cache.set(lock_key, "1", nx=True, ex=10)
        
        if lock_acquired:
            try:
                # This thread fetches data
                data = fetch_func()
                self.cache.setex(key, ttl, json.dumps(data))
                return data
            finally:
                self.cache.delete(lock_key)
        else:
            # Wait for other thread to populate cache
            time.sleep(0.1)
            return self.get_with_lock(key, fetch_func, ttl)

Detailed Examples with Explanations πŸ“

Example 1: Building a Cached API with CloudFront + ElastiCache

Scenario: E-commerce product catalog API with global users.

Architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Users   β”‚ ───→ β”‚ CloudFront β”‚ ───→ β”‚    ALB    β”‚
β”‚ Global   β”‚      β”‚   (CDN)    β”‚      β”‚           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
                                             ↓
                                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                      β”‚   Lambda/    β”‚
                                      β”‚   EC2 App    β”‚
                                      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                                             ↓
                                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                      β”‚ ElastiCache  β”‚
                                      β”‚   (Redis)    β”‚
                                      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                                             ↓
                                      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                      β”‚     RDS      β”‚
                                      β”‚  (Postgres)  β”‚
                                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation:

import json
import redis
import psycopg2
from flask import Flask, jsonify, request

app = Flask(__name__)

## Redis connection
cache = redis.Redis(
    host='my-cluster.xxxxx.cache.amazonaws.com',
    port=6379,
    decode_responses=True,
    socket_connect_timeout=5
)

## Database connection
db = psycopg2.connect(
    host='my-db.xxxxx.rds.amazonaws.com',
    database='products',
    user='admin',
    password='secret'
)

@app.route('/api/products/<product_id>')
def get_product(product_id):
    cache_key = f"product:{product_id}"
    
    # Try Redis cache
    try:
        cached = cache.get(cache_key)
        if cached:
            product = json.loads(cached)
            return jsonify({
                'product': product,
                'source': 'cache'
            }), 200, {
                'Cache-Control': 'public, max-age=60',  # CloudFront caches 60s
                'X-Cache': 'Hit'
            }
    except redis.RedisError as e:
        print(f"Redis error: {e}")
        # Continue to database on cache failure
    
    # Cache miss - query database
    cursor = db.cursor()
    cursor.execute(
        "SELECT id, name, price, description FROM products WHERE id = %s",
        (product_id,)
    )
    row = cursor.fetchone()
    
    if not row:
        return jsonify({'error': 'Product not found'}), 404
    
    product = {
        'id': row[0],
        'name': row[1],
        'price': float(row[2]),
        'description': row[3]
    }
    
    # Store in Redis (15 minute TTL)
    try:
        cache.setex(cache_key, 900, json.dumps(product))
    except redis.RedisError as e:
        print(f"Failed to cache: {e}")
    
    return jsonify({
        'product': product,
        'source': 'database'
    }), 200, {
        'Cache-Control': 'public, max-age=60',
        'X-Cache': 'Miss'
    }

@app.route('/api/products')
def list_products():
    category = request.args.get('category', 'all')
    page = int(request.args.get('page', 1))
    
    cache_key = f"products:list:{category}:page:{page}"
    
    # Try cache
    cached = cache.get(cache_key)
    if cached:
        return cached, 200, {'Cache-Control': 'public, max-age=120'}
    
    # Query database
    cursor = db.cursor()
    offset = (page - 1) * 20
    
    if category == 'all':
        cursor.execute(
            "SELECT id, name, price FROM products LIMIT 20 OFFSET %s",
            (offset,)
        )
    else:
        cursor.execute(
            "SELECT id, name, price FROM products WHERE category = %s LIMIT 20 OFFSET %s",
            (category, offset)
        )
    
    products = [
        {'id': row[0], 'name': row[1], 'price': float(row[2])}
        for row in cursor.fetchall()
    ]
    
    response_data = json.dumps({'products': products, 'page': page})
    
    # Cache for 5 minutes
    cache.setex(cache_key, 300, response_data)
    
    return response_data, 200, {'Cache-Control': 'public, max-age=120'}

Why this works:

  1. CloudFront caches at edge (60s TTL) - users get instant responses
  2. Redis caches database queries (5-15 min TTL) - reduces DB load
  3. Graceful degradation - if cache fails, app still works
  4. Cache-Control headers - tell CloudFront how long to cache

πŸ”§ Try this: Calculate cache hit ratio:

def get_cache_stats():
    info = cache.info('stats')
    hits = info['keyspace_hits']
    misses = info['keyspace_misses']
    total = hits + misses
    
    if total > 0:
        hit_rate = (hits / total) * 100
        return f"Cache hit rate: {hit_rate:.2f}%"
    return "No cache activity yet"

Example 2: Session Store with Redis

Scenario: Store user sessions across multiple web servers.

import redis
import hashlib
import secrets
from datetime import timedelta

class SessionStore:
    def __init__(self, redis_host):
        self.cache = redis.Redis(
            host=redis_host,
            decode_responses=True
        )
        self.session_ttl = 3600  # 1 hour
    
    def create_session(self, user_id, user_data):
        """Create new session and return session token"""
        session_token = secrets.token_urlsafe(32)
        session_key = f"session:{session_token}"
        
        session_data = {
            'user_id': user_id,
            'created_at': str(datetime.utcnow()),
            **user_data
        }
        
        # Store as Redis hash
        self.cache.hset(session_key, mapping=session_data)
        self.cache.expire(session_key, self.session_ttl)
        
        return session_token
    
    def get_session(self, session_token):
        """Retrieve session data"""
        session_key = f"session:{session_token}"
        session_data = self.cache.hgetall(session_key)
        
        if session_data:
            # Extend session TTL on access
            self.cache.expire(session_key, self.session_ttl)
            return session_data
        
        return None
    
    def delete_session(self, session_token):
        """Logout - delete session"""
        session_key = f"session:{session_token}"
        self.cache.delete(session_key)
    
    def update_session(self, session_token, updates):
        """Update session data"""
        session_key = f"session:{session_token}"
        if self.cache.exists(session_key):
            self.cache.hset(session_key, mapping=updates)
            self.cache.expire(session_key, self.session_ttl)
            return True
        return False

## Usage
session_store = SessionStore('my-redis.cache.amazonaws.com')

## User logs in
token = session_store.create_session(
    user_id='12345',
    user_data={'email': 'user@example.com', 'role': 'admin'}
)

## Subsequent requests
session = session_store.get_session(token)
if session:
    print(f"User {session['email']} is logged in")

Why Redis for sessions:

  • Shared across servers - any web server can access
  • Automatic expiration - TTL cleans up old sessions
  • Fast access - sub-millisecond reads
  • Atomic operations - thread-safe updates

Example 3: Real-Time Leaderboard with Redis Sorted Sets

Scenario: Gaming leaderboard with millions of players.

import redis
from typing import List, Tuple

class GameLeaderboard:
    def __init__(self, redis_host):
        self.cache = redis.Redis(host=redis_host, decode_responses=True)
        self.leaderboard_key = "game:global:leaderboard"
    
    def update_score(self, player_id: str, score: int):
        """Add or update player score"""
        # ZADD is O(log(N)) - very efficient
        self.cache.zadd(self.leaderboard_key, {player_id: score})
    
    def increment_score(self, player_id: str, points: int):
        """Add points to existing score"""
        # ZINCRBY is atomic
        new_score = self.cache.zincrby(self.leaderboard_key, points, player_id)
        return int(new_score)
    
    def get_top_players(self, count: int = 10) -> List[Tuple[str, int]]:
        """Get top N players"""
        # ZREVRANGE with scores - O(log(N)+M)
        players = self.cache.zrevrange(
            self.leaderboard_key,
            0,
            count - 1,
            withscores=True
        )
        return [(player, int(score)) for player, score in players]
    
    def get_player_rank(self, player_id: str) -> int:
        """Get player's global rank (1-indexed)"""
        # ZREVRANK - O(log(N))
        rank = self.cache.zrevrank(self.leaderboard_key, player_id)
        return rank + 1 if rank is not None else None
    
    def get_player_score(self, player_id: str) -> int:
        """Get player's current score"""
        score = self.cache.zscore(self.leaderboard_key, player_id)
        return int(score) if score else 0
    
    def get_nearby_players(self, player_id: str, range_size: int = 5):
        """Get players ranked near the given player"""
        rank = self.cache.zrevrank(self.leaderboard_key, player_id)
        if rank is None:
            return []
        
        start = max(0, rank - range_size)
        end = rank + range_size
        
        players = self.cache.zrevrange(
            self.leaderboard_key,
            start,
            end,
            withscores=True
        )
        return [(player, int(score)) for player, score in players]

## Usage example
leaderboard = GameLeaderboard('my-redis.cache.amazonaws.com')

## Players finish games
leaderboard.update_score('player_alice', 1500)
leaderboard.update_score('player_bob', 2300)
leaderboard.increment_score('player_alice', 200)  # Alice gains 200 points

## Get top 10
top_10 = leaderboard.get_top_players(10)
for i, (player, score) in enumerate(top_10, 1):
    print(f"{i}. {player}: {score}")

## Check specific player
alice_rank = leaderboard.get_player_rank('player_alice')
print(f"Alice is ranked #{alice_rank}")

## See nearby competitors
nearby = leaderboard.get_nearby_players('player_alice', 3)

Why Sorted Sets are perfect:

  • O(log N) operations - blazing fast even with millions of entries
  • Automatic sorting - no manual sort needed
  • Range queries - get top N or nearby players efficiently
  • Atomic updates - thread-safe score changes

Example 4: DynamoDB + DAX for High-Traffic Reads

Scenario: News article platform with viral content.

import boto3
from amazondax import AmazonDaxClient
from datetime import datetime

class ArticleService:
    def __init__(self, use_dax=True):
        if use_dax:
            # DAX client (same API as DynamoDB)
            self.client = AmazonDaxClient(
                endpoint_url='my-dax-cluster.cache.amazonaws.com:8111'
            )
        else:
            # Standard DynamoDB
            self.client = boto3.client('dynamodb', region_name='us-east-1')
        
        self.table_name = 'Articles'
    
    def get_article(self, article_id: str):
        """Get single article - benefits from DAX item cache"""
        response = self.client.get_item(
            TableName=self.table_name,
            Key={'articleId': {'S': article_id}}
        )
        return response.get('Item')
    
    def get_trending_articles(self, category: str, limit: int = 20):
        """Query trending articles - benefits from DAX query cache"""
        response = self.client.query(
            TableName=self.table_name,
            IndexName='CategoryTrendingIndex',
            KeyConditionExpression='category = :cat',
            ExpressionAttributeValues={
                ':cat': {'S': category}
            },
            Limit=limit,
            ScanIndexForward=False  # Descending order
        )
        return response.get('Items', [])
    
    def increment_view_count(self, article_id: str):
        """Update view count - writes go through DAX to DynamoDB"""
        self.client.update_item(
            TableName=self.table_name,
            Key={'articleId': {'S': article_id}},
            UpdateExpression='ADD viewCount :inc',
            ExpressionAttributeValues={
                ':inc': {'N': '1'}
            }
        )

## Compare performance
import time

## Without DAX
service_no_dax = ArticleService(use_dax=False)
start = time.time()
for i in range(100):
    service_no_dax.get_article('article-123')
no_dax_time = time.time() - start
print(f"Without DAX: {no_dax_time:.3f}s for 100 reads")

## With DAX
service_with_dax = ArticleService(use_dax=True)
start = time.time()
for i in range(100):
    service_with_dax.get_article('article-123')
dax_time = time.time() - start
print(f"With DAX: {dax_time:.3f}s for 100 reads")
print(f"Speedup: {no_dax_time / dax_time:.1f}x faster")

DAX Performance Impact:

  • First read: ~5-10ms (cache miss, queries DynamoDB)
  • Subsequent reads: ~0.5-1ms (cache hit, from DAX)
  • 10-20x faster for repeated reads

Common Mistakes & How to Avoid Them ⚠️

Mistake 1: Not Setting Appropriate TTLs

❌ Wrong:

## No TTL - cache grows forever
cache.set('user:123', json.dumps(user_data))

## TTL too long for dynamic data
cache.setex('stock_price:AAPL', 86400, price)  # 24 hours!

βœ… Correct:

## Appropriate TTLs based on data volatility
cache.setex('user:123', 3600, json.dumps(user_data))  # 1 hour
cache.setex('stock_price:AAPL', 60, price)  # 1 minute
cache.setex('static_config', 86400, config)  # 24 hours OK for static data

πŸ’‘ TTL Guidelines:

  • User data: 15-60 minutes
  • Product catalogs: 5-30 minutes
  • Real-time prices: 30-60 seconds
  • Static content: Hours to days
  • Session data: 30-60 minutes (refresh on activity)

Mistake 2: Cache Stampede (Thundering Herd)

❌ Wrong:

def get_popular_item():
    cached = cache.get('popular_item')
    if cached:
        return cached
    
    # When cache expires, all requests hit DB simultaneously
    data = expensive_db_query()
    cache.setex('popular_item', 300, data)
    return data

βœ… Correct:

import random

def get_popular_item():
    cached = cache.get('popular_item')
    if cached:
        return cached
    
    # Acquire distributed lock
    lock_key = 'lock:popular_item'
    if cache.set(lock_key, '1', nx=True, ex=10):
        try:
            data = expensive_db_query()
            # Add jitter to prevent synchronized expiration
            ttl = 300 + random.randint(-30, 30)
            cache.setex('popular_item', ttl, data)
            return data
        finally:
            cache.delete(lock_key)
    else:
        # Wait briefly and retry
        time.sleep(0.05)
        return get_popular_item()

Mistake 3: Storing Large Objects in Cache

❌ Wrong:

## Storing 5MB image in Redis
image_data = open('large_image.jpg', 'rb').read()
cache.set('image:123', image_data)  # Bad for memory cache!

βœ… Correct:

## Store S3 URL instead
image_url = 's3://my-bucket/images/123.jpg'
cache.setex('image_url:123', 3600, image_url)

## Client retrieves from CloudFront
cloudfront_url = f'https://cdn.example.com/images/123.jpg'

πŸ’‘ Cache Size Guidelines:

  • Redis values: < 100KB per key (< 10KB ideal)
  • CloudFront: Any size (designed for large files)
  • Large data: Store reference/URL, not data itself

Mistake 4: Not Handling Cache Failures Gracefully

❌ Wrong:

def get_data(key):
    # App crashes if Redis is down
    return json.loads(cache.get(key))

βœ… Correct:

def get_data(key):
    try:
        cached = cache.get(key)
        if cached:
            return json.loads(cached)
    except (redis.RedisError, json.JSONDecodeError) as e:
        logger.warning(f"Cache error: {e}")
        # Fall through to database
    
    # Always have fallback to primary data source
    return fetch_from_database(key)

Mistake 5: Forgetting to Invalidate Cache on Updates

❌ Wrong:

def update_user(user_id, new_data):
    db.update_user(user_id, new_data)
    # Cache still has old data!
    return new_data

βœ… Correct:

def update_user(user_id, new_data):
    # Update database
    db.update_user(user_id, new_data)
    
    # Invalidate cache (option 1)
    cache.delete(f'user:{user_id}')
    
    # OR update cache (option 2 - write-through)
    cache.setex(f'user:{user_id}', 3600, json.dumps(new_data))
    
    return new_data

Mistake 6: Using DAX for Strongly Consistent Reads

❌ Wrong:

## DAX doesn't support ConsistentRead=True
response = dax.get_item(
    TableName='Orders',
    Key={'orderId': {'S': 'order-123'}},
    ConsistentRead=True  # This will fail!
)

βœ… Correct:

## For strongly consistent reads, use DynamoDB directly
dynamodb = boto3.client('dynamodb')
response = dynamodb.get_item(
    TableName='Orders',
    Key={'orderId': {'S': 'order-123'}},
    ConsistentRead=True
)

## Or use DAX for eventually consistent reads
response = dax.get_item(
    TableName='Orders',
    Key={'orderId': {'S': 'order-123'}}
    # ConsistentRead defaults to False
)

Mistake 7: Not Monitoring Cache Performance

❌ Wrong:

## No visibility into cache effectiveness
cache.get(key)  # Is this helping?

βœ… Correct:

import time
from datadog import statsd

def get_with_metrics(key):
    start = time.time()
    cached = cache.get(key)
    duration = time.time() - start
    
    if cached:
        statsd.increment('cache.hit')
        statsd.histogram('cache.latency', duration)
        return json.loads(cached)
    else:
        statsd.increment('cache.miss')
        data = fetch_from_db(key)
        cache.setex(key, 300, json.dumps(data))
        return data

## Monitor in CloudWatch:
## - Cache hit rate
## - Eviction rate
## - CPU/memory usage
## - Connection count

Key Takeaways πŸŽ“

πŸ“‹ Quick Reference Card

Service Best For Latency Key Feature
CloudFront Static content, APIs, global users 10-50ms Edge caching, CDN
ElastiCache Redis Sessions, complex data, pub/sub ~1ms Data structures, persistence
ElastiCache Memcached Simple key-value, horizontal scaling ~1ms Multi-threaded, simple
DAX DynamoDB read-heavy workloads ~400ΞΌs Drop-in DynamoDB replacement

πŸ”‘ Essential Patterns

Cache-Aside Check cache β†’ miss β†’ query DB β†’ populate cache
Write-Through Write to DB β†’ immediately update cache
Write-Behind Write to cache β†’ async write to DB
Cache Warming Pre-populate cache before traffic

⚑ Performance Tips

  • Multi-tier caching: Local β†’ Redis β†’ Database
  • Set appropriate TTLs: Match data volatility
  • Use pipelining: Batch Redis commands
  • Monitor hit rates: Target >80% for reads
  • Handle failures: Always have database fallback
  • Add jitter to TTLs: Prevent synchronized expiration
  • Use connection pooling: Reduce connection overhead

🧠 Memory Devices

"CloudFRONT" Cloud in FRONT of your servers (edge caching)
"DAX = DynamoDB Accelerator" Accelerates DynamoDB, not other databases
"Redis = Remote Dictionary Server" Dictionary = key-value + more structures
"Cache L-TTL" Low TTL for dynamic, Long TTL for static

πŸ€” Did You Know?

  • Netflix uses EVCache (built on Memcached) to cache 500+ TB of data across 30,000+ nodes
  • CloudFront handles over 500 billion requests per day
  • Redis sorted sets power Reddit's comment ranking algorithms
  • A 1% cache hit rate improvement can reduce infrastructure costs by 10-20%
  • Twitter uses Redis to cache timelines for 400+ million users
  • DAX can reduce DynamoDB read costs by up to 90% for read-heavy workloads

πŸ“š Further Study

  1. AWS ElastiCache Best Practices: https://docs.aws.amazon.com/AmazonElastiCache/latest/red-ug/BestPractices.html
  2. CloudFront Developer Guide: https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/Introduction.html
  3. DAX Documentation: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DAX.html

Next Steps: Practice implementing these patterns in your own projects. Start with simple cache-aside for database queries, then add CloudFront for static assets, and finally experiment with Redis data structures for more complex use cases. Monitor your cache hit rates and iterate on TTL values to optimize performance!