You are viewing a preview of this lesson. Sign in to start learning
Back to Mastering AWS

Event-Driven & Messaging

EventBridge, SQS, SNS, Kinesis, and MSK for decoupled architectures

Event-Driven Architecture and Messaging on AWS

Master event-driven architecture and messaging services with free flashcards and spaced repetition practice. This lesson covers Amazon EventBridge, SNS, SQS, and architectural patterns for decoupled microservicesβ€”essential concepts for building scalable, resilient AWS applications.

Welcome to Event-Driven & Messaging πŸ’¬

In modern cloud architectures, services need to communicate without tight coupling. Event-driven architecture (EDA) enables components to react to events asynchronously, improving scalability and fault tolerance. AWS provides several messaging and event services that form the backbone of distributed systems.

Think of event-driven systems like a newsroom πŸ“°: reporters (producers) publish stories, editors (event routers) decide which desks need them, and writers (consumers) work on their copies independently. No one waits around for direct handoffsβ€”work flows naturally through the system.

Core Concepts 🧠

Understanding Event-Driven Architecture

Event-driven architecture is a design pattern where services communicate by producing and consuming events rather than making direct synchronous calls. An event represents a significant change in stateβ€”"order placed," "payment processed," "inventory depleted."

TRADITIONAL SYNCHRONOUS
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”    HTTP     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚Service A│────────────→│Service Bβ”‚
β”‚ (waits) │←────────────│(responds)β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     ⏳ Blocking         ⏳ Coupling

EVENT-DRIVEN ASYNCHRONOUS
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   publish   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚Service A│────────────→│Event Bus β”‚
β”‚continuesβ”‚             β”‚or Queue  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
                             β”‚subscribe
                        β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”
                        β”‚Service B β”‚
                        β”‚processes β”‚
                        β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     βœ… Non-blocking    βœ… Decoupled

Key benefits:

  • Loose coupling: Services don't need to know about each other
  • Scalability: Consumers can scale independently based on load
  • Resilience: If a consumer fails, messages wait in queues
  • Flexibility: Easy to add new consumers without changing producers

AWS Messaging Services Overview

Service Type Use Case Pattern
Amazon SQS Queue Work distribution, buffering Point-to-point
Amazon SNS Pub/Sub Fan-out notifications, alerts Broadcast
Amazon EventBridge Event Bus Application integration, SaaS events Event routing
Amazon Kinesis Stream Real-time analytics, logs Streaming data

Amazon SQS (Simple Queue Service) πŸ“¬

Amazon SQS is a fully managed message queuing service that enables asynchronous communication between services. Messages are stored reliably until consumers process them.

Queue Types:

  1. Standard Queue

    • Unlimited throughput: Nearly unlimited transactions per second
    • At-least-once delivery: Messages delivered at least once (may duplicate)
    • Best-effort ordering: Messages generally arrive in order but not guaranteed
    • πŸ’‘ Use when: High throughput matters more than exact ordering
  2. FIFO Queue (First-In-First-Out)

    • Guaranteed ordering: Messages processed in exact order sent
    • Exactly-once processing: No duplicates
    • Limited throughput: 300 TPS (3000 with batching)
    • πŸ’‘ Use when: Order and deduplication are critical (e.g., financial transactions)

Key Features:

## SQS Message Flow
import boto3

sqs = boto3.client('sqs')
queue_url = 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'

## Producer: Send message
response = sqs.send_message(
    QueueUrl=queue_url,
    MessageBody='Order placed: order-12345',
    MessageAttributes={
        'OrderId': {'StringValue': 'order-12345', 'DataType': 'String'},
        'Priority': {'StringValue': 'high', 'DataType': 'String'}
    }
)

## Consumer: Receive and process
messages = sqs.receive_message(
    QueueUrl=queue_url,
    MaxNumberOfMessages=10,
    WaitTimeSeconds=20  # Long polling
)

for message in messages.get('Messages', []):
    # Process message
    process_order(message['Body'])
    
    # Delete after successful processing
    sqs.delete_message(
        QueueUrl=queue_url,
        ReceiptHandle=message['ReceiptHandle']
    )

Visibility Timeout: When a consumer receives a message, it becomes invisible to other consumers for a configurable period (default 30 seconds). If not deleted within this time, the message becomes visible again for retry.

Dead Letter Queue (DLQ): Messages that fail processing repeatedly (exceed maxReceiveCount) are moved to a DLQ for later analysis.

SQS MESSAGE LIFECYCLE
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  1. Producer sends message                   β”‚
β”‚     β”‚                                        β”‚
β”‚     ↓                                        β”‚
β”‚  2. Message stored in queue                  β”‚
β”‚     β”‚                                        β”‚
β”‚     ↓                                        β”‚
β”‚  3. Consumer polls queue                     β”‚
β”‚     β”‚                                        β”‚
β”‚     ↓                                        β”‚
β”‚  4. Message invisible (visibility timeout)   β”‚
β”‚     β”‚                                        β”‚
β”‚     β”œβ”€β†’ SUCCESS ──→ 5. Delete message       β”‚
β”‚     β”‚                                        β”‚
β”‚     └─→ FAILURE ──→ 6. Becomes visible againβ”‚
β”‚                         β”‚                    β”‚
β”‚                         └─→ Retry or β†’ DLQ  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ’‘ Long Polling: Set WaitTimeSeconds > 0 to reduce empty responses and API costs. The call waits up to 20 seconds for messages to arrive.

Amazon SNS (Simple Notification Service) πŸ“’

Amazon SNS implements publish-subscribe messaging. Publishers send messages to topics, and all subscribed endpoints receive copies simultaneously.

Subscription Types:

  • HTTP/HTTPS endpoints
  • Email/Email-JSON
  • SMS text messages
  • AWS Lambda functions
  • Amazon SQS queues
  • Mobile push notifications
// SNS Fan-out Pattern
const AWS = require('aws-sdk');
const sns = new AWS.SNS();

// Publish to topic
const params = {
    TopicArn: 'arn:aws:sns:us-east-1:123456789:order-events',
    Message: JSON.stringify({
        orderId: 'order-12345',
        customerId: 'cust-789',
        amount: 99.99,
        timestamp: Date.now()
    }),
    Subject: 'New Order Placed',
    MessageAttributes: {
        'event_type': {
            DataType: 'String',
            StringValue: 'order.created'
        }
    }
};

await sns.publish(params).promise();

// All subscribers receive this:
// - Lambda: Updates inventory
// - SQS: Queues for fulfillment
// - Email: Notifies customer service
// - HTTP: Webhooks to analytics platform

Message Filtering: Subscribers can filter messages based on attributes:

{
  "event_type": ["order.created", "order.shipped"],
  "priority": ["high"],
  "region": [{"prefix": "us-"}]
}

Only messages matching the filter policy are delivered to that subscription.

SNS + SQS Fan-out Pattern 🌟:

                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚  SNS Topic  β”‚
                    β”‚ "OrderEvent"β”‚
                    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
                           β”‚ publish
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚                 β”‚                 β”‚
         ↓                 ↓                 ↓
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚ SQS      β”‚      β”‚ SQS      β”‚      β”‚ Lambda   β”‚
   β”‚ Inventoryβ”‚      β”‚ Shipping β”‚      β”‚ Analyticsβ”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚                 β”‚                 β”‚
        ↓                 ↓                 ↓
   [Process A]       [Process B]       [Process C]
   Independently     Independently     Independently

This pattern combines SNS broadcast with SQS bufferingβ€”each subscriber gets its own queue for independent processing.

Amazon EventBridge πŸŒ‰

Amazon EventBridge (formerly CloudWatch Events) is a serverless event bus service that routes events between AWS services, SaaS applications, and custom applications.

Key Concepts:

  1. Event Bus: A router that receives events and delivers them to targets based on rules

    • Default event bus: AWS service events
    • Custom event buses: Your application events
    • Partner event buses: SaaS integrations (Zendesk, Datadog, etc.)
  2. Event Pattern: JSON structure defining which events to match

  3. Rules: Connect patterns to targets

  4. Targets: What happens when an event matches (Lambda, Step Functions, SQS, etc.)

// EventBridge Event Structure
{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "Order Placed",
  "source": "com.mycompany.orders",
  "account": "123456789012",
  "time": "2024-01-15T14:30:00Z",
  "region": "us-east-1",
  "resources": [],
  "detail": {
    "orderId": "order-12345",
    "customerId": "cust-789",
    "amount": 99.99,
    "items": [
      {"sku": "WIDGET-001", "quantity": 2}
    ]
  }
}

Event Pattern Matching:

{
  "source": ["com.mycompany.orders"],
  "detail-type": ["Order Placed"],
  "detail": {
    "amount": [{"numeric": [">", 100]}],
    "customerId": [{"prefix": "premium-"}]
  }
}

This rule only triggers for orders over $100 from premium customers.

Publishing Events:

import boto3
import json
from datetime import datetime

eventbridge = boto3.client('events')

response = eventbridge.put_events(
    Entries=[
        {
            'Time': datetime.now(),
            'Source': 'com.mycompany.orders',
            'DetailType': 'Order Placed',
            'Detail': json.dumps({
                'orderId': 'order-12345',
                'customerId': 'cust-789',
                'amount': 99.99
            }),
            'EventBusName': 'custom-event-bus'
        }
    ]
)

EventBridge vs SNS:

Feature EventBridge SNS
Routing Content-based filtering (complex patterns) Attribute-based filtering (simple)
Targets 20+ AWS services directly 6 subscription types
Schema Registry βœ… Built-in ❌ Not available
Archive/Replay βœ… Native support ❌ Not available
SaaS Integration βœ… Partner event buses ❌ Manual only
Throughput Moderate (soft limits) Very high

πŸ’‘ When to use EventBridge: Complex routing logic, AWS service integration, schema management, event replay requirements

πŸ’‘ When to use SNS: Simple fan-out, high throughput, SMS/email/push notifications

Message Durability and Delivery Guarantees πŸ”’

At-most-once delivery: Message delivered zero or one time (may be lost)

  • Use case: Telemetry data where occasional loss is acceptable

At-least-once delivery: Message delivered one or more times (may duplicate)

  • SQS Standard: Uses this model
  • SNS: Retry delivery with exponential backoff
  • Consumer must implement idempotency (processing same message multiple times has same effect as once)

Exactly-once delivery: Message delivered exactly one time

  • SQS FIFO: Provides exactly-once processing within visibility timeout
  • Requires deduplication (based on message ID or content hash)

Idempotency Strategies:

import hashlib
import redis

## Track processed messages
redis_client = redis.Redis()

def process_message_idempotent(message):
    # Generate unique message ID
    message_id = hashlib.sha256(
        message['Body'].encode()
    ).hexdigest()
    
    # Check if already processed
    if redis_client.exists(f"processed:{message_id}"):
        print(f"Message {message_id} already processed, skipping")
        return
    
    # Process the message
    result = process_order(message)
    
    # Mark as processed (with TTL to prevent infinite growth)
    redis_client.setex(
        f"processed:{message_id}",
        86400,  # 24 hours
        "1"
    )
    
    return result

Real-World Examples 🌍

Example 1: E-Commerce Order Processing Pipeline

Scenario: An online store needs to coordinate multiple services when an order is placed: charge payment, update inventory, send confirmation email, and trigger fulfillment.

Architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                  ORDER PROCESSING                    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

[Web App] ──places order──→ [API Gateway + Lambda]
                                    β”‚
                                    ↓ publishes event
                            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                            β”‚  SNS Topic    β”‚
                            β”‚ "OrderPlaced" β”‚
                            β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                                    β”‚
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
          β”‚                         β”‚                         β”‚
          ↓                         ↓                         ↓
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   SQS    β”‚              β”‚   SQS    β”‚              β”‚  Lambda  β”‚
    β”‚ Payment  β”‚              β”‚Inventory β”‚              β”‚  Email   β”‚
    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚                         β”‚                         β”‚
         ↓                         ↓                         ↓
    [Lambda]                  [Lambda]               [SES/SendEmail]
    Charge Card               Decrement Stock        Confirmation
         β”‚                         β”‚
         └────→ Success? β†β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
                    ↓
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β”‚  EventBridge β”‚
            β”‚ "OrderReady" β”‚
            β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚
                   ↓
            [Step Functions]
            Fulfillment Workflow

Implementation:

// Order service publishes event
const publishOrderEvent = async (order) => {
    const sns = new AWS.SNS();
    
    await sns.publish({
        TopicArn: process.env.ORDER_TOPIC_ARN,
        Message: JSON.stringify({
            orderId: order.id,
            customerId: order.customerId,
            items: order.items,
            total: order.total,
            timestamp: Date.now()
        }),
        MessageAttributes: {
            orderValue: {
                DataType: 'Number',
                StringValue: order.total.toString()
            },
            customerType: {
                DataType: 'String',
                StringValue: order.customerType  // 'regular' or 'premium'
            }
        }
    }).promise();
};

// Payment processor (SQS consumer)
const processPayment = async (event) => {
    for (const record of event.Records) {
        const order = JSON.parse(record.body);
        const snsMessage = JSON.parse(order.Message);
        
        try {
            // Charge payment
            const payment = await stripeClient.charges.create({
                amount: snsMessage.total * 100,
                currency: 'usd',
                customer: snsMessage.customerId
            });
            
            // Publish success event to EventBridge
            await eventbridge.putEvents({
                Entries: [{
                    Source: 'com.store.payment',
                    DetailType: 'Payment Processed',
                    Detail: JSON.stringify({
                        orderId: snsMessage.orderId,
                        paymentId: payment.id
                    })
                }]
            }).promise();
            
        } catch (error) {
            // Payment failed - send to DLQ for retry
            console.error('Payment failed:', error);
            throw error;  // Lambda will retry
        }
    }
};

Benefits:

  • Scalability: Each service scales independently based on queue depth
  • Resilience: Payment failures don't block inventory updates
  • Flexibility: Easy to add new services (analytics, fraud detection) by subscribing to SNS topic

Example 2: Real-Time Monitoring and Alerting

Scenario: Monitor application metrics and trigger alerts when thresholds are exceeded, with different notification channels for different severity levels.

Architecture:

## Lambda function triggered by CloudWatch alarm via EventBridge
import boto3
import json

sns = boto3.client('sns')
eventbridge = boto3.client('events')

def lambda_handler(event, context):
    # EventBridge event from CloudWatch alarm
    alarm_name = event['detail']['alarmName']
    state = event['detail']['state']['value']
    metric = event['detail']['configuration']['metrics'][0]['metricStat']['metric']
    
    severity = determine_severity(metric['metricName'], state)
    
    # Route to appropriate notification channel
    if severity == 'CRITICAL':
        # Page on-call engineer via SNS β†’ SMS
        sns.publish(
            TopicArn='arn:aws:sns:us-east-1:123456789:critical-alerts',
            Message=f"CRITICAL: {alarm_name} - {state}",
            Subject='Critical System Alert'
        )
        
        # Also create PagerDuty incident via EventBridge
        eventbridge.put_events(
            Entries=[{
                'Source': 'com.monitoring.alerts',
                'DetailType': 'Critical Alert',
                'Detail': json.dumps({
                    'severity': 'critical',
                    'alarm': alarm_name,
                    'metric': metric['metricName']
                })
            }]
        )
        
    elif severity == 'WARNING':
        # Post to Slack channel via SNS β†’ Lambda β†’ Slack API
        sns.publish(
            TopicArn='arn:aws:sns:us-east-1:123456789:warning-alerts',
            Message=json.dumps({
                'alarm': alarm_name,
                'state': state,
                'metric': metric
            })
        )
    
    return {'statusCode': 200}

def determine_severity(metric_name, state):
    critical_metrics = ['CPUUtilization', 'DatabaseConnections', 'DiskSpace']
    if state == 'ALARM' and metric_name in critical_metrics:
        return 'CRITICAL'
    return 'WARNING'

EventBridge Rule Configuration:

{
  "source": ["aws.cloudwatch"],
  "detail-type": ["CloudWatch Alarm State Change"],
  "detail": {
    "state": {
      "value": ["ALARM"]
    }
  }
}

Example 3: Microservices Orchestration with Saga Pattern

Scenario: Book a trip requiring coordination between flight, hotel, and car rental services. If any service fails, compensating transactions must roll back the others.

Saga Orchestration using Step Functions + EventBridge:

SAGA PATTERN: Travel Booking

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚     Step Functions Workflow            β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                        β”‚
β”‚  1. Book Flight ──┐                    β”‚
β”‚        β”‚         success               β”‚
β”‚        ↓          β”‚                    β”‚
β”‚  2. Book Hotel β†β”€β”€β”˜                    β”‚
β”‚        β”‚         success               β”‚
β”‚        ↓          β”‚                    β”‚
β”‚  3. Book Car   β†β”€β”€β”˜                    β”‚
β”‚        β”‚                               β”‚
β”‚     β”Œβ”€β”€β”΄β”€β”€β”€β”                           β”‚
β”‚  success  failure                      β”‚
β”‚     β”‚       β”‚                          β”‚
β”‚     ↓       ↓                          β”‚
β”‚  Confirm  COMPENSATE:                  β”‚
β”‚  Booking  - Cancel Car                 β”‚
β”‚           - Cancel Hotel               β”‚
β”‚           - Cancel Flight              β”‚
β”‚                                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation:

// Flight service publishes booking events
const bookFlight = async (bookingRequest) => {
    try {
        const reservation = await flightAPI.reserve(bookingRequest);
        
        // Publish success event
        await eventbridge.putEvents({
            Entries: [{
                Source: 'com.travel.flights',
                DetailType: 'Flight Booked',
                Detail: JSON.stringify({
                    bookingId: bookingRequest.sagaId,
                    flightId: reservation.id,
                    status: 'reserved'
                }),
                EventBusName: 'travel-event-bus'
            }]
        }).promise();
        
        return reservation;
        
    } catch (error) {
        // Publish failure event
        await eventbridge.putEvents({
            Entries: [{
                Source: 'com.travel.flights',
                DetailType: 'Flight Booking Failed',
                Detail: JSON.stringify({
                    bookingId: bookingRequest.sagaId,
                    error: error.message
                })
            }]
        }).promise();
        
        throw error;
    }
};

// Compensation handler listens for rollback events
const handleCompensation = async (event) => {
    const detail = event.detail;
    
    if (detail.compensateService === 'flights') {
        await flightAPI.cancel(detail.flightId);
        
        // Confirm compensation complete
        await eventbridge.putEvents({
            Entries: [{
                Source: 'com.travel.flights',
                DetailType: 'Flight Cancelled',
                Detail: JSON.stringify({
                    bookingId: detail.bookingId,
                    flightId: detail.flightId
                })
            }]
        }).promise();
    }
};

Example 4: Event Sourcing for Audit Trail

Scenario: Track all changes to customer accounts for compliance, with ability to reconstruct account state at any point in time.

Event Sourcing Pattern:

## All state changes are stored as events
import boto3
import json
from datetime import datetime

dynamodb = boto3.resource('dynamodb')
eventbridge = boto3.client('events')

class AccountEventStore:
    def __init__(self):
        self.events_table = dynamodb.Table('account-events')
    
    def publish_event(self, account_id, event_type, event_data):
        # Store event in DynamoDB
        event = {
            'accountId': account_id,
            'eventId': f"{account_id}-{datetime.now().timestamp()}",
            'eventType': event_type,
            'timestamp': datetime.now().isoformat(),
            'data': event_data
        }
        
        self.events_table.put_item(Item=event)
        
        # Publish to EventBridge for downstream processing
        eventbridge.put_events(
            Entries=[{
                'Source': 'com.banking.accounts',
                'DetailType': event_type,
                'Detail': json.dumps(event)
            }]
        )
        
        return event
    
    def get_account_state(self, account_id, at_timestamp=None):
        # Reconstruct account state by replaying events
        response = self.events_table.query(
            KeyConditionExpression='accountId = :id',
            ExpressionAttributeValues={':id': account_id}
        )
        
        state = {'balance': 0, 'status': 'active'}
        
        for event in sorted(response['Items'], key=lambda e: e['timestamp']):
            # Stop if we've reached the requested timestamp
            if at_timestamp and event['timestamp'] > at_timestamp:
                break
            
            # Apply event to state
            if event['eventType'] == 'AccountCreated':
                state = event['data']
            elif event['eventType'] == 'FundsDeposited':
                state['balance'] += event['data']['amount']
            elif event['eventType'] == 'FundsWithdrawn':
                state['balance'] -= event['data']['amount']
            elif event['eventType'] == 'AccountSuspended':
                state['status'] = 'suspended'
        
        return state

## Usage
store = AccountEventStore()

## Record a deposit
store.publish_event(
    account_id='ACC-12345',
    event_type='FundsDeposited',
    data={'amount': 500.00, 'source': 'wire-transfer'}
)

## Get current state
current = store.get_account_state('ACC-12345')

## Get state as of specific date (for audit)
historical = store.get_account_state('ACC-12345', at_timestamp='2024-01-01')

Benefits:

  • Complete audit trail: Every change is recorded
  • Time travel: Reconstruct state at any point
  • Debugging: Replay events to reproduce issues
  • Analytics: Process event stream for insights

Common Mistakes ⚠️

1. Not Implementing Idempotency

❌ Wrong:

def process_order(message):
    order = json.loads(message['Body'])
    # Dangerous: Will charge customer multiple times if message redelivered!
    charge_credit_card(order['customerId'], order['amount'])
    update_inventory(order['items'])

βœ… Right:

def process_order(message):
    order = json.loads(message['Body'])
    order_id = order['orderId']
    
    # Check if already processed
    if dynamodb.get_item(Key={'orderId': order_id}).get('Item'):
        print(f"Order {order_id} already processed")
        return
    
    # Use idempotency key with payment provider
    charge_credit_card(
        customer=order['customerId'],
        amount=order['amount'],
        idempotency_key=order_id  # Prevents duplicate charges
    )
    
    update_inventory(order['items'])
    
    # Mark as processed
    dynamodb.put_item(Item={'orderId': order_id, 'processedAt': datetime.now()})

2. Synchronous Processing of Messages

❌ Wrong:

// Lambda receiving 100 messages from SQS
exports.handler = async (event) => {
    for (const record of event.Records) {
        // Processes messages one by one sequentially
        await processMessage(record);  // Takes 2 seconds each = 200 seconds total!
    }
};

βœ… Right:

exports.handler = async (event) => {
    // Process all messages concurrently
    await Promise.all(
        event.Records.map(record => processMessage(record))
    );  // Takes ~2 seconds total!
};

3. Not Setting Appropriate Visibility Timeouts

❌ Problem: Default 30-second visibility timeout, but processing takes 60 seconds. Message becomes visible again mid-processing, causing duplicate processing.

βœ… Solution:

## Set queue visibility timeout to match Lambda timeout
sqs.create_queue(
    QueueName='processing-queue',
    Attributes={
        'VisibilityTimeout': '300',  # 5 minutes
        'ReceiveMessageWaitTimeSeconds': '20'  # Long polling
    }
)

## Or extend visibility during processing
receipt_handle = message['ReceiptHandle']
sqs.change_message_visibility(
    QueueUrl=queue_url,
    ReceiptHandle=receipt_handle,
    VisibilityTimeout=180  # Extend by 3 more minutes
)

4. Missing Dead Letter Queues

❌ Wrong: Messages fail repeatedly and eventually get deleted, losing data.

βœ… Right:

## Configure DLQ with appropriate maxReceiveCount
sqs.create_queue(
    QueueName='main-queue',
    Attributes={
        'RedrivePolicy': json.dumps({
            'deadLetterTargetArn': dlq_arn,
            'maxReceiveCount': '3'  # Try 3 times then move to DLQ
        })
    }
)

## Monitor DLQ with CloudWatch alarm
cloudwatch.put_metric_alarm(
    AlarmName='DLQ-Messages-Alert',
    MetricName='ApproximateNumberOfMessagesVisible',
    Namespace='AWS/SQS',
    Dimensions=[{'Name': 'QueueName', 'Value': 'main-queue-dlq'}],
    Threshold=10,
    ComparisonOperator='GreaterThanThreshold'
)

5. Ignoring Message Ordering Requirements

❌ Wrong: Using Standard SQS queue for bank transactions where order matters.

βœ… Right:

## Use FIFO queue with message group ID
sqs.send_message(
    QueueUrl='https://sqs.us-east-1.amazonaws.com/123456789/transactions.fifo',
    MessageBody=json.dumps(transaction),
    MessageGroupId=account_id,  # Orders messages per account
    MessageDeduplicationId=transaction_id  # Prevents duplicates
)

6. Not Using Long Polling

❌ Wrong: Short polling wastes API calls and costs money.

## Short polling - returns immediately even if no messages
messages = sqs.receive_message(QueueUrl=queue_url)  # Costs add up!

βœ… Right:

## Long polling - waits up to 20 seconds for messages
messages = sqs.receive_message(
    QueueUrl=queue_url,
    WaitTimeSeconds=20,  # Reduces empty responses by 90%+
    MaxNumberOfMessages=10  # Batch processing
)

7. Overly Complex Event Patterns

❌ Wrong: Single event with 50 different fields trying to serve all consumers.

βœ… Right: Specific event types for different purposes.

## Instead of one giant event:
## {'type': 'ORDER', 'status': 'placed', 'substatus': 'pending_payment', ...}

## Create specific events:
eventbridge.put_events(Entries=[
    {
        'Source': 'com.store.orders',
        'DetailType': 'Order Placed',
        'Detail': json.dumps({'orderId': 'ord-123', 'customerId': 'cust-456'})
    },
    {
        'Source': 'com.store.orders',
        'DetailType': 'Payment Required',
        'Detail': json.dumps({'orderId': 'ord-123', 'amount': 99.99})
    }
])

Key Takeaways 🎯

πŸ“‹ Quick Reference Card

Service When to Use Key Feature
SQS Standard High throughput, order doesn't matter Unlimited TPS, at-least-once delivery
SQS FIFO Order critical, no duplicates Exactly-once, guaranteed order
SNS Broadcast to multiple subscribers Fan-out pattern, SMS/email/push
EventBridge Complex routing, AWS service integration Content filtering, schema registry, replay
SNS + SQS Fan-out with buffering per consumer Best of both: broadcast + independent processing

Essential Patterns:

  • πŸ”„ Fan-out: One message β†’ many consumers (SNS)
  • πŸ“Š Work queue: Distribute tasks among workers (SQS)
  • 🎯 Event routing: Complex conditional routing (EventBridge)
  • πŸ” Saga: Distributed transactions with compensation
  • πŸ“ Event sourcing: Immutable event log as source of truth

Best Practices:

  • βœ… Always implement idempotency for at-least-once delivery
  • βœ… Use long polling to reduce costs (WaitTimeSeconds=20)
  • βœ… Configure Dead Letter Queues to catch failures
  • βœ… Set visibility timeout > processing time
  • βœ… Use FIFO queues only when ordering is critical
  • βœ… Batch send/receive messages to reduce API calls
  • βœ… Use message attributes for filtering, not message body parsing
  • βœ… Monitor queue depth and DLQ with CloudWatch alarms

πŸ“š Further Study

🧠 Memory Device for Service Selection:

"SQEEF" - SQS for Queues, SNS for Everyone, EventBridge for Everything else, FIFO when order matters

"The 3 A's" of messaging:

  • Asynchronous: Don't wait for responses
  • At-least-once: Prepare for duplicates with idempotency
  • Autonomous: Services work independently

Congratulations! You now understand AWS event-driven architecture and messaging services. Practice building decoupled systems using these patterns to create scalable, resilient applications. πŸš€