Advanced Architecture Patterns
Multi-region, hybrid cloud, service mesh, and migration strategies
Advanced Architecture Patterns in AWS
Master AWS advanced architecture patterns with free flashcards and spaced repetition practice. This lesson covers event-driven architectures, CQRS and Event Sourcing, microservices patterns, saga orchestration, and strangler fig patternsβessential concepts for building scalable, resilient cloud systems at the enterprise level.
Welcome to Advanced AWS Architecture ποΈ
Welcome to the world of sophisticated cloud architecture! As you progress beyond basic AWS services, you'll need patterns that handle complexity, scale, and distributed system challenges. This lesson explores battle-tested architectural patterns used by organizations processing billions of requests daily.
You'll learn how Netflix, Amazon, and other tech giants structure their cloud applications to handle massive scale while maintaining reliability. These patterns represent years of collective learning from production failures and successes.
What You'll Master:
- Event-driven architecture and choreography
- CQRS (Command Query Responsibility Segregation)
- Event Sourcing principles
- Saga patterns for distributed transactions
- Strangler Fig for legacy migration
- Circuit breakers and bulkheads
- Service mesh concepts
π‘ Pro Tip: These patterns aren't just theoreticalβthey solve real problems like eventual consistency, distributed transactions, and system resilience. Understanding when to apply each pattern is as important as knowing how.
Core Concepts: Event-Driven Architecture (EDA) β‘
Event-driven architecture is a design pattern where services communicate through events rather than direct calls. An event represents a significant state change in the system.
Key Components:
| Component | AWS Service | Purpose |
|---|---|---|
| Event Producers | Lambda, EC2, ECS | Generate events when state changes |
| Event Router | EventBridge, SNS | Routes events to interested consumers |
| Event Consumers | Lambda, SQS, Step Functions | React to events asynchronously |
| Event Store | DynamoDB Streams, Kinesis | Persistent event log |
Event-Driven Flow Diagram:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EVENT-DRIVEN ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π¦ Order Service π EventBridge π§ Email Service
β β β
β OrderPlaced Event β β
ββββββββββββββββββββββββββββ β
β β Route by Rule β
β ββββββββββββββββββββββββββ
β β β
β β π¨ Send Email
β β β
β β β
β π³ Payment Service β
β β β
β Process Payment β
β β β
β PaymentConfirmed β β
ββββββββββββββββββββββββββββ€ β
β β β
β β β
Update Status π Analytics π Success
Benefits:
- Loose Coupling: Services don't need to know about each other
- Scalability: Consumers process events at their own pace
- Resilience: Failed consumers don't affect producers
- Auditability: Event log provides complete history
Trade-offs:
- Eventual Consistency: No immediate guarantees
- Debugging Complexity: Harder to trace request flows
- Message Ordering: Requires careful design
π‘ When to Use EDA:
- Systems with many loosely-coupled services
- High-scale applications requiring independent scaling
- Audit requirements needing complete event history
- Real-time data processing pipelines
CQRS: Command Query Responsibility Segregation π
CQRS separates read operations (queries) from write operations (commands) using different models. This pattern optimizes each side independently.
Traditional vs CQRS Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TRADITIONAL ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π€ Client
β
βββββ READ βββββ π¦ Single Model ββββ ποΈ Database
β β
βββββ WRITE ββββββββββββββ
β οΈ Same model for reads and writes
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CQRS ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π€ Client
β
βββββ READ βββββ π Query Model ββββ ποΈ Read DB
β β (optimized)
β β
β π‘ Sync Events
β β
βββββ WRITE ββββ βοΈ Command Model βββ ποΈ Write DB
(normalized)
Implementing CQRS with AWS:
## Command Side - Write Model
import boto3
from datetime import datetime
dynamodb = boto3.resource('dynamodb')
events_table = dynamodb.Table('EventStore')
commands_table = dynamodb.Table('Commands')
class OrderCommandHandler:
def create_order(self, order_data):
# Validate command
if not self._validate_order(order_data):
raise ValueError("Invalid order data")
# Store command
command_id = str(uuid.uuid4())
commands_table.put_item(
Item={
'commandId': command_id,
'commandType': 'CreateOrder',
'data': order_data,
'timestamp': datetime.utcnow().isoformat()
}
)
# Emit event
event = {
'eventId': str(uuid.uuid4()),
'eventType': 'OrderCreated',
'aggregateId': order_data['orderId'],
'data': order_data,
'timestamp': datetime.utcnow().isoformat()
}
events_table.put_item(Item=event)
# Publish to EventBridge
self._publish_event(event)
return command_id
## Query Side - Read Model
import boto3
from boto3.dynamodb.conditions import Key
dynamodb = boto3.resource('dynamodb')
read_model_table = dynamodb.Table('OrderReadModel')
class OrderQueryHandler:
def get_order_summary(self, order_id):
# Optimized read model with denormalized data
response = read_model_table.get_item(
Key={'orderId': order_id}
)
return response.get('Item')
def get_customer_orders(self, customer_id):
# GSI optimized for customer queries
response = read_model_table.query(
IndexName='CustomerIndex',
KeyConditionExpression=Key('customerId').eq(customer_id)
)
return response.get('Items', [])
def get_order_analytics(self, date_range):
# Materialized view optimized for analytics
# Pre-aggregated data updated by event handlers
return self._query_analytics_view(date_range)
## Event Handler - Synchronizes Read Model
import boto3
import json
dynamodb = boto3.resource('dynamodb')
read_model_table = dynamodb.Table('OrderReadModel')
def lambda_handler(event, context):
"""Lambda triggered by EventBridge on OrderCreated events"""
for record in event['Records']:
event_data = json.loads(record['body'])
if event_data['eventType'] == 'OrderCreated':
# Update read model with denormalized data
order = event_data['data']
read_model_table.put_item(
Item={
'orderId': order['orderId'],
'customerId': order['customerId'],
'customerName': order['customerName'],
'totalAmount': order['totalAmount'],
'status': 'CREATED',
'items': order['items'],
'createdAt': event_data['timestamp'],
# Denormalized customer info for fast reads
'customerEmail': order['customerEmail'],
'shippingAddress': order['shippingAddress']
}
)
CQRS Benefits:
- Performance: Optimize reads and writes separately
- Scalability: Scale read and write sides independently
- Flexibility: Different models for different needs
- Security: Separate permissions for commands and queries
Implementation Considerations:
| Aspect | Command Side | Query Side |
|---|---|---|
| Database | DynamoDB (single-table) | ElasticSearch, DynamoDB (GSIs) |
| Consistency | Strong consistency | Eventual consistency |
| Schema | Normalized | Denormalized |
| Operations | Write-heavy, validations | Read-heavy, fast queries |
β οΈ Common Mistake: Applying CQRS everywhere. Use it only when read/write patterns significantly differ or when you need independent scaling.
Event Sourcing: The Complete Audit Trail π
Event Sourcing stores state changes as a sequence of events rather than storing just the current state. The current state is derived by replaying events.
Traditional vs Event Sourcing:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TRADITIONAL STATE STORAGE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ποΈ Database Record:
ββββββββββββββββββββββββββββ
β orderId: 12345 β
β status: SHIPPED β β Only current state
β amount: $50.00 β β οΈ History lost!
ββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EVENT SOURCING STORAGE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π¦ Event Stream (orderId: 12345):
ββββββββββββββββββββββββββββββββββββββββββββββββ
β 1. OrderCreated { amount: $50 } β
β 2. PaymentReceived { method: "card" } β
β 3. OrderShipped { carrier: "FedEx" } β
β 4. OrderDelivered { timestamp: "..." } β
ββββββββββββββββββββββββββββββββββββββββββββββββ
β Replay events to get current state
ββββββββββββββββββββββββββββ
β Current State: β
β status: DELIVERED β
β amount: $50.00 β
ββββββββββββββββββββββββββββ
Event Sourcing Implementation:
## Event Store using DynamoDB
import boto3
from datetime import datetime
import json
class EventStore:
def __init__(self):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table('EventStore')
def append_event(self, aggregate_id, event_type, event_data, expected_version):
"""Append event with optimistic concurrency control"""
event = {
'aggregateId': aggregate_id,
'version': expected_version + 1,
'eventType': event_type,
'eventData': json.dumps(event_data),
'timestamp': datetime.utcnow().isoformat(),
'eventId': str(uuid.uuid4())
}
try:
# Conditional write ensures version correctness
self.table.put_item(
Item=event,
ConditionExpression='attribute_not_exists(aggregateId) OR version < :new_version',
ExpressionAttributeValues={
':new_version': event['version']
}
)
return event
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
raise ConcurrencyException("Version conflict detected")
raise
def get_events(self, aggregate_id, from_version=0):
"""Retrieve event stream for an aggregate"""
response = self.table.query(
KeyConditionExpression=Key('aggregateId').eq(aggregate_id) & Key('version').gte(from_version),
ScanIndexForward=True # Chronological order
)
return response.get('Items', [])
## Aggregate that uses Event Sourcing
class OrderAggregate:
def __init__(self, order_id):
self.order_id = order_id
self.version = 0
self.state = {
'status': 'PENDING',
'items': [],
'total': 0
}
self.uncommitted_events = []
def load_from_history(self, events):
"""Rebuild state by replaying events"""
for event in events:
self._apply_event(event)
self.version = event['version']
def create_order(self, items, customer_id):
"""Command: Create new order"""
if self.version > 0:
raise InvalidOperationException("Order already exists")
event = {
'eventType': 'OrderCreated',
'orderId': self.order_id,
'customerId': customer_id,
'items': items,
'total': sum(item['price'] for item in items)
}
self._apply_event(event)
self.uncommitted_events.append(event)
def ship_order(self, carrier, tracking_number):
"""Command: Ship order"""
if self.state['status'] != 'PAID':
raise InvalidOperationException("Can only ship paid orders")
event = {
'eventType': 'OrderShipped',
'orderId': self.order_id,
'carrier': carrier,
'trackingNumber': tracking_number
}
self._apply_event(event)
self.uncommitted_events.append(event)
def _apply_event(self, event):
"""Apply event to update internal state"""
event_type = event['eventType']
if event_type == 'OrderCreated':
self.state['items'] = event['items']
self.state['total'] = event['total']
self.state['status'] = 'CREATED'
elif event_type == 'PaymentReceived':
self.state['status'] = 'PAID'
self.state['paymentMethod'] = event['paymentMethod']
elif event_type == 'OrderShipped':
self.state['status'] = 'SHIPPED'
self.state['carrier'] = event['carrier']
self.state['trackingNumber'] = event['trackingNumber']
elif event_type == 'OrderDelivered':
self.state['status'] = 'DELIVERED'
self.state['deliveredAt'] = event['timestamp']
## Repository that manages aggregate persistence
class OrderRepository:
def __init__(self, event_store):
self.event_store = event_store
def get(self, order_id):
"""Load aggregate from event stream"""
events = self.event_store.get_events(order_id)
order = OrderAggregate(order_id)
order.load_from_history(events)
return order
def save(self, order):
"""Persist new events to store"""
for event in order.uncommitted_events:
self.event_store.append_event(
order.order_id,
event['eventType'],
event,
order.version
)
order.version += 1
order.uncommitted_events.clear()
Event Sourcing Benefits:
- Complete Audit Trail: Every state change is recorded
- Time Travel: Reconstruct state at any point in time
- Debugging: Replay events to reproduce issues
- Event Replay: Rebuild read models from scratch
- Business Insights: Analyze event patterns
Snapshots for Performance:
class SnapshotStore:
def save_snapshot(self, aggregate_id, version, state):
"""Save snapshot every N events"""
if version % 100 == 0: # Snapshot every 100 events
self.snapshot_table.put_item(
Item={
'aggregateId': aggregate_id,
'version': version,
'state': json.dumps(state),
'timestamp': datetime.utcnow().isoformat()
}
)
def get_snapshot(self, aggregate_id):
"""Get latest snapshot"""
response = self.snapshot_table.query(
KeyConditionExpression=Key('aggregateId').eq(aggregate_id),
ScanIndexForward=False, # Latest first
Limit=1
)
return response.get('Items', [None])[0]
π‘ Best Practice: Use snapshots to avoid replaying thousands of events. Load latest snapshot, then replay only subsequent events.
Saga Pattern: Distributed Transactions π
The Saga pattern manages distributed transactions across multiple services using a sequence of local transactions, each publishing events or messages.
Two Saga Implementation Styles:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CHOREOGRAPHY-BASED SAGA β
β (Decentralized, Event-driven) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π¦ Order Service
β
β 1. Create Order
ββββββββββ OrderCreated Event
β β
β β
β π³ Payment Service
β β
β 2. Process Payment
β ββββββ PaymentProcessed Event
β β β
β β β
β β π¦ Inventory Service
β β β
β β 3. Reserve Items
β β ββββ ItemsReserved Event
β β β β
β β β β
β β β π Shipping Service
β β β β
β β β 4. Create Shipment
β β β β
β β β β
β
Complete β
Complete β
Complete β
Complete
β οΈ If failure: Each service handles compensation
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ORCHESTRATION-BASED SAGA β
β (Centralized, Step Functions) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
π Saga Orchestrator
(Step Functions)
β
ββββββββββββββΌβββββββββββββ
β β β
β β β
1. Order 2. Payment 3. Inventory
β β β
β
β
β FAILED!
β
β
π Compensating Transactions
β
ββββββββββββββΌβββββββββββββ
β β β
β β β
Undo Refund Cancel
Order Payment Reserve
Orchestration Saga with Step Functions:
{
"Comment": "Order Processing Saga",
"StartAt": "CreateOrder",
"States": {
"CreateOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:CreateOrder",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "OrderFailed"
}
],
"Next": "ProcessPayment"
},
"ProcessPayment": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:ProcessPayment",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "CancelOrder"
}
],
"Next": "ReserveInventory"
},
"ReserveInventory": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:ReserveInventory",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "RefundPayment"
}
],
"Next": "CreateShipment"
},
"CreateShipment": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:CreateShipment",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "ReleaseInventory"
}
],
"Next": "OrderCompleted"
},
"CancelOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:CancelOrder",
"Next": "OrderFailed"
},
"RefundPayment": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:RefundPayment",
"Next": "CancelOrder"
},
"ReleaseInventory": {
"Type": "Task",
"Resource": "arn:aws:lambda:region:account:function:ReleaseInventory",
"Next": "RefundPayment"
},
"OrderCompleted": {
"Type": "Succeed"
},
"OrderFailed": {
"Type": "Fail",
"Error": "OrderProcessingFailed",
"Cause": "One or more steps in the saga failed"
}
}
}
Compensating Transaction Implementation:
## Payment Service with compensation
import boto3
class PaymentService:
def __init__(self):
self.table = boto3.resource('dynamodb').Table('Payments')
def process_payment(self, order_id, amount, payment_method):
"""Forward transaction"""
payment_id = str(uuid.uuid4())
try:
# Call payment gateway
result = self._charge_payment(amount, payment_method)
# Record payment
self.table.put_item(
Item={
'paymentId': payment_id,
'orderId': order_id,
'amount': amount,
'status': 'COMPLETED',
'transactionId': result['transactionId'],
'timestamp': datetime.utcnow().isoformat()
}
)
return {'paymentId': payment_id, 'status': 'SUCCESS'}
except PaymentException as e:
# Record failed payment
self.table.put_item(
Item={
'paymentId': payment_id,
'orderId': order_id,
'amount': amount,
'status': 'FAILED',
'error': str(e),
'timestamp': datetime.utcnow().isoformat()
}
)
raise
def refund_payment(self, payment_id):
"""Compensating transaction"""
# Get original payment
response = self.table.get_item(Key={'paymentId': payment_id})
payment = response['Item']
if payment['status'] != 'COMPLETED':
return {'status': 'ALREADY_REFUNDED'}
# Process refund
try:
refund_result = self._process_refund(payment['transactionId'])
# Update payment record
self.table.update_item(
Key={'paymentId': payment_id},
UpdateExpression='SET #status = :status, refundId = :refundId',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={
':status': 'REFUNDED',
':refundId': refund_result['refundId']
}
)
return {'status': 'REFUNDED', 'refundId': refund_result['refundId']}
except Exception as e:
# Log compensation failure for manual intervention
self._log_compensation_failure(payment_id, str(e))
raise CompensationException(f"Failed to refund payment: {e}")
Choreography vs Orchestration:
| Aspect | Choreography | Orchestration |
|---|---|---|
| Control | Decentralized | Centralized |
| Coupling | Loose (event-driven) | Tighter (orchestrator knows all) |
| Visibility | Hard to track flow | Clear workflow definition |
| Complexity | Distributed logic | Central logic |
| Best For | Simple flows, fewer services | Complex flows, many steps |
π‘ Pro Tip: Use orchestration (Step Functions) for complex business processes. Use choreography (EventBridge) for loosely-coupled event reactions.
Strangler Fig Pattern: Legacy Migration πΏ
The Strangler Fig pattern incrementally migrates legacy systems by gradually replacing functionality with new services.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β STRANGLER FIG MIGRATION PHASES β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
PHASE 1: Legacy System PHASE 2: Hybrid
ββββββββββββββββββββ ββββββββββββββββ
β β β πΏ NEW API β
β ποΈ MONOLITH β β /users β
β β β /products β
β All Routes β βββββ ββββββββββββββββ€
β All Logic β β Proxy/Route β
β All Data β ββββββββββββββββ€
β β β ποΈ LEGACY β
ββββββββββββββββββββ β /orders β
β /reports β
ββββββββββββββββ
PHASE 3: Mostly Migrated PHASE 4: Complete
ββββββββββββββββββββ ββββββββββββββββ
β πΏ MICROSERVICESβ β πΏ NEW β
β /users β β SYSTEM β
β /products β β β
β /orders β βββββ β All Routes β
ββββββββββββββββββββ€ β All Servicesβ
β ποΈ LEGACY (10%)β β β
β /reports β β Decomm Old β
ββββββββββββββββββββ ββββββββββββββββ
Implementation with API Gateway:
## Lambda@Edge function for routing
import json
MIGRATED_ROUTES = {
'/api/users',
'/api/products',
'/api/inventory'
}
NEW_SERVICE_URL = 'https://new-api.example.com'
LEGACY_SERVICE_URL = 'https://legacy.example.com'
def lambda_handler(event, context):
"""Route requests between new and legacy systems"""
request = event['Records'][0]['cf']['request']
uri = request['uri']
# Route migrated paths to new service
if any(uri.startswith(route) for route in MIGRATED_ROUTES):
request['origin'] = {
'custom': {
'domainName': NEW_SERVICE_URL.replace('https://', ''),
'port': 443,
'protocol': 'https',
'path': '',
'sslProtocols': ['TLSv1.2'],
'readTimeout': 30,
'keepaliveTimeout': 5
}
}
request['headers']['x-forwarded-host'] = [{
'key': 'X-Forwarded-Host',
'value': NEW_SERVICE_URL
}]
else:
# Route to legacy system
request['origin'] = {
'custom': {
'domainName': LEGACY_SERVICE_URL.replace('https://', ''),
'port': 443,
'protocol': 'https',
'path': '',
'sslProtocols': ['TLSv1.2'],
'readTimeout': 30,
'keepaliveTimeout': 5
}
}
return request
Data Migration Strategy:
## Dual-write pattern during migration
class UserService:
def __init__(self):
self.new_db = boto3.resource('dynamodb').Table('Users-New')
self.legacy_db = self._get_legacy_connection()
def create_user(self, user_data):
"""Write to both new and legacy systems"""
user_id = str(uuid.uuid4())
# Write to new system (source of truth)
try:
self.new_db.put_item(
Item={
'userId': user_id,
'email': user_data['email'],
'name': user_data['name'],
'createdAt': datetime.utcnow().isoformat()
}
)
except Exception as e:
raise ServiceException(f"Failed to write to new system: {e}")
# Best-effort write to legacy (for backwards compatibility)
try:
self.legacy_db.insert_user(user_id, user_data)
except Exception as e:
# Log but don't fail - legacy write is secondary
logger.warning(f"Failed legacy write for user {user_id}: {e}")
return user_id
def get_user(self, user_id):
"""Read from new system, fallback to legacy"""
try:
response = self.new_db.get_item(Key={'userId': user_id})
if 'Item' in response:
return response['Item']
except Exception as e:
logger.error(f"Failed to read from new system: {e}")
# Fallback to legacy
try:
user = self.legacy_db.get_user(user_id)
# Asynchronously migrate to new system
self._schedule_migration(user_id, user)
return user
except Exception as e:
raise UserNotFoundException(f"User {user_id} not found")
Migration Phases:
- Coexist: New service deployed alongside legacy
- Intercept: Proxy routes traffic based on rules
- Dual-Write: Write to both systems during transition
- Migrate Data: Gradually move historical data
- Switch Read: Change to read from new system
- Remove Legacy: Decommission old system
β οΈ Critical Considerations:
- Data Consistency: Handle dual-write failures gracefully
- Rollback Plan: Must be able to revert routing
- Feature Parity: Ensure new service matches legacy capabilities
- Testing: Shadow traffic, canary deployments
Circuit Breaker and Resilience Patterns π
Circuit Breaker prevents cascading failures by stopping calls to failing services and allowing them time to recover.
Circuit Breaker States:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CIRCUIT BREAKER STATE MACHINE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ββββββββββββββββ
β π’ CLOSED β
β (Normal ops) β
ββββββββ¬ββββββββ
β
Failures β Success
exceed β requests
threshold β pass
β
β
ββββββββββββββββ
βββββββββββ π΄ OPEN β
β β (Fail fast) β
β ββββββββ¬ββββββββ
β β
β Timeout β
β expires β
β β
β β
β ββββββββββββββββ
β β π‘ HALF-OPEN β
β β (Testing) β
β ββββββββ¬ββββββββ
β β
β Failure β Success
ββββββββββββββββββ€ (threshold met)
β
β
Back to CLOSED
Implementation:
## Circuit Breaker implementation
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60, success_threshold=2):
self.failure_threshold = failure_threshold
self.timeout = timeout # seconds
self.success_threshold = success_threshold
self.failure_count = 0
self.success_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
self.lock = Lock()
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.success_count = 0
else:
raise CircuitOpenException("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
"""Handle successful call"""
with self.lock:
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.success_count = 0
def _on_failure(self):
"""Handle failed call"""
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
self.success_count = 0
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_attempt_reset(self):
"""Check if timeout has elapsed"""
return (time.time() - self.last_failure_time) >= self.timeout
## Usage with AWS service calls
import boto3
from botocore.exceptions import ClientError
class ResilientPaymentService:
def __init__(self):
self.circuit_breaker = CircuitBreaker(
failure_threshold=5,
timeout=60,
success_threshold=2
)
self.payment_gateway_url = 'https://payment-api.example.com'
def process_payment(self, amount, payment_method):
"""Process payment with circuit breaker"""
try:
return self.circuit_breaker.call(
self._call_payment_gateway,
amount,
payment_method
)
except CircuitOpenException:
# Circuit is open - use fallback
return self._fallback_payment_handling(amount, payment_method)
except Exception as e:
# Other errors
raise PaymentException(f"Payment failed: {e}")
def _call_payment_gateway(self, amount, payment_method):
"""Actual payment gateway call"""
import requests
response = requests.post(
f"{self.payment_gateway_url}/charge",
json={'amount': amount, 'method': payment_method},
timeout=5
)
response.raise_for_status()
return response.json()
def _fallback_payment_handling(self, amount, payment_method):
"""Fallback when circuit is open"""
# Queue payment for later processing
sqs = boto3.client('sqs')
sqs.send_message(
QueueUrl='https://sqs.region.amazonaws.com/account/pending-payments',
MessageBody=json.dumps({
'amount': amount,
'paymentMethod': payment_method,
'timestamp': datetime.utcnow().isoformat()
})
)
return {'status': 'QUEUED', 'message': 'Payment will be processed later'}
Bulkhead Pattern:
## Bulkhead pattern - isolate resources
from concurrent.futures import ThreadPoolExecutor
import boto3
class BulkheadService:
def __init__(self):
# Separate thread pools for different operations
self.critical_pool = ThreadPoolExecutor(max_workers=10)
self.standard_pool = ThreadPoolExecutor(max_workers=5)
self.background_pool = ThreadPoolExecutor(max_workers=2)
def process_critical_request(self, data):
"""Critical operations get dedicated resources"""
future = self.critical_pool.submit(self._process_order, data)
return future.result(timeout=5)
def process_standard_request(self, data):
"""Standard operations share separate pool"""
future = self.standard_pool.submit(self._process_query, data)
return future.result(timeout=10)
def process_background_task(self, data):
"""Background tasks don't impact critical operations"""
self.background_pool.submit(self._generate_report, data)
return {'status': 'QUEUED'}
Resilience Patterns Summary:
| Pattern | Purpose | AWS Implementation |
|---|---|---|
| Circuit Breaker | Prevent cascading failures | Lambda, custom middleware |
| Retry with Backoff | Handle transient failures | SDK built-in, SQS redrives |
| Timeout | Prevent hanging requests | Lambda timeout, API Gateway timeout |
| Bulkhead | Isolate resources | Separate Lambda functions, thread pools |
| Fallback | Graceful degradation | Cache, default responses |
Common Mistakes and How to Avoid Them β οΈ
1. Applying CQRS Everywhere
β Wrong:
## Every simple entity uses CQRS
class SimpleUserService:
def __init__(self):
self.write_db = WriteDatabase()
self.read_db = ReadDatabase()
self.event_bus = EventBus()
# Unnecessary complexity for simple CRUD!
β Right:
## Use CQRS only when read/write patterns differ significantly
class SimpleUserService:
def __init__(self):
self.db = boto3.resource('dynamodb').Table('Users')
# Simple table for simple use cases
class AnalyticsService:
# CQRS makes sense here - complex aggregations
def __init__(self):
self.command_db = DynamoDB() # Normalized writes
self.query_db = ElasticSearch() # Optimized reads
2. Event Sourcing Without Snapshots
β Wrong:
def load_aggregate(aggregate_id):
events = event_store.get_all_events(aggregate_id)
# Replaying 100,000 events takes forever!
for event in events:
aggregate.apply(event)
β Right:
def load_aggregate(aggregate_id):
snapshot = snapshot_store.get_latest(aggregate_id)
if snapshot:
aggregate.load_snapshot(snapshot)
events = event_store.get_events_after(aggregate_id, snapshot.version)
else:
events = event_store.get_all_events(aggregate_id)
for event in events:
aggregate.apply(event)
3. Saga Without Compensation
β Wrong:
def process_order(order_id):
create_order(order_id)
charge_payment(order_id)
reserve_inventory(order_id) # Fails here!
# Payment charged but inventory not reserved - INCONSISTENT!
β Right:
def process_order(order_id):
try:
order = create_order(order_id)
try:
payment = charge_payment(order_id)
try:
reserve_inventory(order_id)
except InventoryException:
refund_payment(payment.id) # Compensate
raise
except PaymentException:
cancel_order(order.id) # Compensate
raise
except Exception as e:
logger.error(f"Order {order_id} failed: {e}")
raise
4. Missing Circuit Breaker Timeouts
β Wrong:
circuit_breaker = CircuitBreaker(failure_threshold=5)
## No timeout - circuit stays open forever!
β Right:
circuit_breaker = CircuitBreaker(
failure_threshold=5,
timeout=60, # Try again after 60 seconds
success_threshold=2 # Need 2 successes to close
)
5. Ignoring Idempotency
β Wrong:
def handle_order_event(event):
order = create_order(event['data'])
# If event is replayed, duplicate order created!
β Right:
def handle_order_event(event):
idempotency_key = event['eventId']
# Check if already processed
if idempotency_store.exists(idempotency_key):
return idempotency_store.get_result(idempotency_key)
order = create_order(event['data'])
idempotency_store.save(idempotency_key, order)
return order
6. Strangler Fig Without Feature Flags
β Wrong:
## Hard switch - all or nothing
if uri.startswith('/api/users'):
return route_to_new_service()
β Right:
def route_request(uri, user_id):
if uri.startswith('/api/users'):
# Feature flag for gradual rollout
if feature_flags.is_enabled('new_user_service', user_id):
return route_to_new_service()
return route_to_legacy_service()
Key Takeaways π―
π Advanced Architecture Patterns Quick Reference
| Pattern | Use When | Key Benefit |
|---|---|---|
| Event-Driven | Loosely-coupled services | Scalability, resilience |
| CQRS | Read/write patterns differ | Independent optimization |
| Event Sourcing | Audit trail required | Complete history |
| Saga | Distributed transactions | Eventual consistency |
| Strangler Fig | Legacy migration | Incremental change |
| Circuit Breaker | Prevent cascading failure | Resilience |
Remember:
- π― Pattern Selection: Choose based on specific problems, not trends
- βοΈ Trade-offs: Every pattern has costsβeventual consistency, complexity, operational overhead
- π§ͺ Start Simple: Add patterns as needs arise, not preemptively
- π Monitor Everything: Distributed systems require comprehensive observability
- π Idempotency: Critical for event-driven and saga patterns
- ποΈ Feature Flags: Essential for gradual rollouts and rollbacks
π§ Mnemonic for Pattern Selection: "ECSS-BR"
- Events for loose coupling
- CQRS when reads β writes
- Sourcing for history
- Saga for distributed txns
- Breaker for resilience
- Replace (Strangler) for migration
π Further Study
- AWS Architecture Blog: https://aws.amazon.com/blogs/architecture/
- Martin Fowler on Microservices: https://martinfowler.com/microservices/
- AWS Well-Architected Framework: https://aws.amazon.com/architecture/well-architected/