Advanced Architecture & AI
Master multi-region patterns, data analytics, AI services, and professional-level AWS architecture
Advanced AWS Architecture and AI Integration
Master AWS advanced architecture patterns and AI service integration with free flashcards and spaced repetition practice. This lesson covers serverless AI pipelines, multi-region architectures, cost optimization strategies, and real-time ML inferenceโessential concepts for AWS Solutions Architect Professional certification and building production-scale cloud systems.
Welcome to Advanced AWS Architecture & AI ๐
Welcome to the cutting edge of cloud architecture! In this lesson, you'll explore how to design sophisticated AWS systems that leverage artificial intelligence, maintain high availability across regions, and optimize costs at scale. Whether you're preparing for the AWS Solutions Architect Professional exam or architecting production systems, these patterns will elevate your cloud engineering skills.
What You'll Learn:
- Serverless AI Architectures: Building event-driven ML pipelines
- Multi-Region Strategies: Active-active and active-passive patterns
- Cost Optimization: Right-sizing, reserved capacity, and spot instances
- Real-Time ML Inference: Low-latency prediction architectures
- Security & Compliance: Encryption, IAM policies, and audit trails
Core Concepts
1. Serverless AI Pipeline Architecture ๐ค
Serverless architectures eliminate infrastructure management while providing automatic scaling. When combined with AWS AI services, you can build powerful ML pipelines without managing servers.
Key Components:
| Service | Purpose | Use Case |
|---|---|---|
| Lambda | Compute orchestration | Data preprocessing, inference triggering |
| SageMaker | ML model training/hosting | Custom models, batch transform |
| Step Functions | Workflow coordination | Multi-stage ML pipelines |
| S3 | Data storage | Training data, model artifacts |
| EventBridge | Event routing | Trigger pipelines on file upload |
Architecture Pattern:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SERVERLESS AI PIPELINE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ S3 Upload
โ
โผ
๐ EventBridge Rule
โ
โผ
โก Lambda (Preprocessor)
โ
โโโโ ๐ง SageMaker Batch Transform
โ โ
โ โผ
โ ๐ Results to S3
โ โ
โ โผ
โโโโ โก Lambda (Postprocessor)
โ
โผ
๐พ DynamoDB Results
โ
โผ
๐ง SNS Notification
๐ก Pro Tip: Use Lambda Layers to package common ML libraries (NumPy, Pandas) and share them across multiple functions, reducing deployment size and improving cold start times.
2. Multi-Region Architecture Patterns ๐
Multi-region architectures provide disaster recovery, reduced latency for global users, and compliance with data residency requirements.
Active-Passive Pattern:
โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ
โ PRIMARY REGION โ โ STANDBY REGION โ
โ us-east-1 โ โ eu-west-1 โ
โโโโโโโโโโโโโโโโโโโโค โโโโโโโโโโโโโโโโโโโโค
โ โ
Serves trafficโ โ โธ๏ธ Warm standby โ
โ ๐ต Active RDS โ โS3 Replโโ โ ๐ต Read Replica โ
โ ๐ DynamoDB โ โGlobal Tblโโ โ ๐ DynamoDB โ
โ ๐ Lambda โ โ ๐ Lambda (idle) โ
โโโโโโโโโโฌโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ
โ
โผ
๐ Route 53
Health Check
โ
โโโ(failover)โโโ Secondary if primary fails
Active-Active Pattern:
โโโโโโโโโโโโโโโโโโโโ Route 53 โโโโโโโโโโโโโโโโโโโโ โ us-east-1 โ โ Geolocation โ โ eu-west-1 โ โโโโโโโโโโโโโโโโโโโโค or Latency โโโโโโโโโโโโโโโโโโโโค โ โ Serves US โ โ โ Serves EU โ โ ๐ต Aurora Global โ โโReplicationโโ โ ๐ต Aurora Global โ โ ๐ DynamoDB Glob โ โโโMulti-Writeโโโ โ ๐ DynamoDB Glob โ โ ๐ CloudFront โ โ ๐ CloudFront โ โโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโ
When to Use Each:
| Pattern | RTO | RPO | Cost | Best For |
|---|---|---|---|---|
| Active-Passive | Minutes | Seconds | Lower | Cost-sensitive, regional apps |
| Active-Active | Seconds | Near-zero | Higher | Global users, zero-downtime |
โ ๏ธ Common Mistake: Forgetting to test failover regularly. Implement GameDays where you intentionally fail over to verify your disaster recovery procedures work.
3. Real-Time ML Inference Architecture โก
Low-latency prediction systems require careful architecture to meet SLA requirements (typically <100ms response time).
Architecture Layers:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ REAL-TIME INFERENCE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ API Gateway (REST/WebSocket)
โ
โผ
โก Lambda (Input Validation)
โ
โโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโ
โผ โผ โผ
๐ฅ ElastiCache ๐ฆ Model Serving ๐ง SageMaker
(Feature Cache) (Lambda Container) (Endpoint)
โ โ โ
โโโโโโโโโโฌโโโโโโโโดโโโโโโโโโโโโโโ
โผ
๐ Prediction
โ
โโโโโโโโโโดโโโโโโโโโ
โผ โผ
๐พ DynamoDB ๐ Kinesis Firehose
(Results) (Logging/Analytics)
Optimization Strategies:
- Model Serving Options:
| Method | Latency | Cost | Scalability |
|---|---|---|---|
| Lambda Container | 10-50ms | Low | Automatic |
| SageMaker Endpoint | 5-20ms | Medium | Manual/Auto |
| ECS Fargate | 5-15ms | Medium | Manual |
| EKS (Kubernetes) | 3-10ms | Higher | Complex |
- Caching Strategy:
import boto3
import json
from redis import Redis
redis_client = Redis(host='elasticache-endpoint', port=6379)
sagemaker = boto3.client('sagemaker-runtime')
def lambda_handler(event, context):
input_data = event['body']
cache_key = f"prediction:{hash(input_data)}"
# Check cache first
cached = redis_client.get(cache_key)
if cached:
return {
'statusCode': 200,
'body': json.dumps({'prediction': cached.decode(), 'cached': True})
}
# Invoke SageMaker endpoint
response = sagemaker.invoke_endpoint(
EndpointName='my-model-endpoint',
ContentType='application/json',
Body=input_data
)
prediction = response['Body'].read().decode()
# Cache result (TTL: 1 hour)
redis_client.setex(cache_key, 3600, prediction)
return {
'statusCode': 200,
'body': json.dumps({'prediction': prediction, 'cached': False})
}
๐ก Pro Tip: Use SageMaker Multi-Model Endpoints to host multiple models on a single endpoint, reducing costs by 50-70% when you have many models with sporadic traffic.
4. Cost Optimization Strategies ๐ฐ
AWS costs can spiral without proper optimization. Here are battle-tested strategies:
Compute Optimization:
| Strategy | Savings | Tradeoffs |
|---|---|---|
| Reserved Instances | 40-60% | 1-3 year commitment |
| Savings Plans | 30-50% | $/hour commitment |
| Spot Instances | 60-90% | Can be interrupted |
| Lambda vs EC2 | Varies | Workload dependent |
| Graviton Processors | 20-40% | ARM architecture |
Storage Tiering:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ S3 LIFECYCLE POLICIES โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ S3 Standard
โ (Active data, <30 days)
โ $0.023/GB
โผ
๐ฆ S3 IA (Infrequent Access)
โ (30-90 days)
โ $0.0125/GB
โผ
๐ง S3 Glacier Flexible
โ (90-365 days)
โ $0.004/GB
โผ
โ๏ธ S3 Glacier Deep Archive
โ (>365 days)
โ $0.00099/GB
โผ
๐๏ธ Delete (7 years retention met)
Cost Monitoring Architecture:
import boto3
from datetime import datetime, timedelta
ce = boto3.client('ce') # Cost Explorer
def analyze_costs():
end = datetime.now().date()
start = end - timedelta(days=30)
response = ce.get_cost_and_usage(
TimePeriod={
'Start': str(start),
'End': str(end)
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
GroupBy=[
{'Type': 'DIMENSION', 'Key': 'SERVICE'},
{'Type': 'TAG', 'Key': 'Environment'}
]
)
for result in response['ResultsByTime']:
date = result['TimePeriod']['Start']
for group in result['Groups']:
service = group['Keys'][0]
cost = group['Metrics']['UnblendedCost']['Amount']
print(f"{date} - {service}: ${float(cost):.2f}")
return response
๐ง Memory Device: RIGS for cost optimization:
- Right-size your instances
- Identify idle resources
- Gravitate to cheaper services
- Spot and reserved capacity
5. Security & Compliance Architecture ๐
Defense in Depth Strategy:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ SECURITY LAYERS โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ CloudFront + WAF
โ (DDoS protection, geo-blocking)
โผ
๐ช API Gateway
โ (Throttling, API keys)
โผ
๐ IAM + Cognito
โ (Authentication, authorization)
โผ
โก Lambda (Service-to-Service)
โ (Execution role, resource policies)
โผ
๐ VPC Security Groups
โ (Network-level firewall)
โผ
๐พ Encrypted Storage
โ (KMS encryption at rest)
โผ
๐ CloudTrail + GuardDuty
(Audit logs, threat detection)
IAM Best Practices:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": "arn:aws:s3:::my-ml-bucket/predictions/*",
"Condition": {
"StringEquals": {
"s3:x-amz-server-side-encryption": "aws:kms"
},
"IpAddress": {
"aws:SourceIp": "10.0.0.0/16"
}
}
},
{
"Effect": "Deny",
"Action": "s3:*",
"Resource": "*",
"Condition": {
"Bool": {
"aws:SecureTransport": "false"
}
}
}
]
}
Encryption Strategy:
| Layer | Method | Service |
|---|---|---|
| In Transit | TLS 1.2+ | ALB, CloudFront, API Gateway |
| At Rest | KMS encryption | S3, EBS, RDS, DynamoDB |
| Application | Client-side | Encryption SDK |
| Secrets | Rotation | Secrets Manager |
โ ๏ธ Common Mistake: Using the default AWS-managed KMS key. Create customer-managed keys (CMK) for better control over rotation, access policies, and audit trails.
Detailed Examples
Example 1: Image Classification Pipeline with Auto-Scaling ๐ธ
Scenario: Build a serverless pipeline that processes uploaded images, classifies them using a SageMaker model, and scales automatically based on queue depth.
Architecture:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ IMAGE CLASSIFICATION PIPELINE โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
๐ฑ User Upload โ ๐ S3 Bucket
โ
โผ
๐ S3 Event
โ
โผ
๐ฎ SQS Queue
โ
โโโโโโโโโโโโโผโโโโโโโโโโโโ
โผ โผ โผ
โก Lambda โก Lambda โก Lambda
(Consumer) (Consumer) (Consumer)
โ โ โ
โโโโโโโโโโโโโผโโโโโโโโโโโโ
โผ
๐ง SageMaker Endpoint
(Image Classification)
โ
โผ
๐พ DynamoDB Table
(Results + Metadata)
โ
โผ
๐ง SNS Topic
(Notification)
Step 1: S3 Bucket with Event Notifications
import boto3
import json
s3 = boto3.client('s3')
sqs = boto3.client('sqs')
## Create bucket
bucket_name = 'image-classification-uploads'
s3.create_bucket(Bucket=bucket_name)
## Configure event notification to SQS
queue_arn = 'arn:aws:sqs:us-east-1:123456789012:image-queue'
notification_config = {
'QueueConfigurations': [
{
'QueueArn': queue_arn,
'Events': ['s3:ObjectCreated:*'],
'Filter': {
'Key': {
'FilterRules': [
{'Name': 'suffix', 'Value': '.jpg'},
{'Name': 'suffix', 'Value': '.png'}
]
}
}
}
]
}
s3.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration=notification_config
)
Step 2: Lambda Consumer with Auto-Scaling
import boto3
import json
import base64
from PIL import Image
from io import BytesIO
s3 = boto3.client('s3')
sagemaker = boto3.client('sagemaker-runtime')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('ImageClassifications')
ENDPOINT_NAME = 'image-classifier-endpoint'
def lambda_handler(event, context):
for record in event['Records']:
# Parse SQS message
s3_event = json.loads(record['body'])
bucket = s3_event['Records'][0]['s3']['bucket']['name']
key = s3_event['Records'][0]['s3']['object']['key']
# Download and preprocess image
response = s3.get_object(Bucket=bucket, Key=key)
image_data = response['Body'].read()
image = Image.open(BytesIO(image_data))
image = image.resize((224, 224)) # Model input size
# Convert to bytes
buffer = BytesIO()
image.save(buffer, format='JPEG')
image_bytes = buffer.getvalue()
# Invoke SageMaker endpoint
inference_response = sagemaker.invoke_endpoint(
EndpointName=ENDPOINT_NAME,
ContentType='application/x-image',
Body=image_bytes
)
# Parse prediction
result = json.loads(inference_response['Body'].read().decode())
predicted_class = result['predictions'][0]['class']
confidence = result['predictions'][0]['confidence']
# Store in DynamoDB
table.put_item(
Item={
'image_key': key,
'predicted_class': predicted_class,
'confidence': str(confidence),
'timestamp': str(context.aws_request_id)
}
)
print(f"Classified {key}: {predicted_class} ({confidence:.2%})")
return {'statusCode': 200, 'body': 'Processing complete'}
Step 3: Auto-Scaling Configuration
import boto3
autoscaling = boto3.client('application-autoscaling')
## Register Lambda concurrency as scalable target
autoscaling.register_scalable_target(
ServiceNamespace='lambda',
ResourceId='function:ImageClassifierConsumer:provisioned-concurrency',
ScalableDimension='lambda:function:ProvisionedConcurrency',
MinCapacity=5,
MaxCapacity=100
)
## Create scaling policy based on SQS queue depth
autoscaling.put_scaling_policy(
PolicyName='SQSQueueDepthScaling',
ServiceNamespace='lambda',
ResourceId='function:ImageClassifierConsumer:provisioned-concurrency',
ScalableDimension='lambda:function:ProvisionedConcurrency',
PolicyType='TargetTrackingScaling',
TargetTrackingScalingPolicyConfiguration={
'TargetValue': 100.0, # Messages per Lambda
'CustomizedMetricSpecification': {
'MetricName': 'ApproximateNumberOfMessagesVisible',
'Namespace': 'AWS/SQS',
'Dimensions': [
{'Name': 'QueueName', 'Value': 'image-queue'}
],
'Statistic': 'Average'
}
}
)
Why This Works:
- S3 events trigger SQS, providing buffer against spikes
- SQS queue enables controlled processing rate
- Lambda concurrency scales based on queue depth
- DynamoDB provides fast, scalable storage for results
Example 2: Multi-Region Active-Active with DynamoDB Global Tables ๐
Scenario: E-commerce platform serving global customers with <50ms latency and 99.99% availability.
import boto3
from botocore.exceptions import ClientError
dynamodb = boto3.client('dynamodb')
def create_global_table():
table_name = 'Orders'
# Create table in primary region (us-east-1)
try:
dynamodb.create_table(
TableName=table_name,
KeySchema=[
{'AttributeName': 'order_id', 'KeyType': 'HASH'},
{'AttributeName': 'timestamp', 'KeyType': 'RANGE'}
],
AttributeDefinitions=[
{'AttributeName': 'order_id', 'AttributeType': 'S'},
{'AttributeName': 'timestamp', 'AttributeType': 'N'}
],
BillingMode='PAY_PER_REQUEST',
StreamSpecification={
'StreamEnabled': True,
'StreamViewType': 'NEW_AND_OLD_IMAGES'
}
)
print("Table created in us-east-1")
# Wait for table to be active
waiter = dynamodb.get_waiter('table_exists')
waiter.wait(TableName=table_name)
except ClientError as e:
if e.response['Error']['Code'] == 'ResourceInUseException':
print("Table already exists")
else:
raise
# Create global table (adds replicas)
dynamodb.create_global_table(
GlobalTableName=table_name,
ReplicationGroup=[
{'RegionName': 'us-east-1'},
{'RegionName': 'eu-west-1'},
{'RegionName': 'ap-southeast-1'}
]
)
print("Global table created with 3 replicas")
return table_name
def write_with_conflict_resolution(order_data):
"""
Writes to local region with version tracking
for conflict resolution
"""
table = boto3.resource('dynamodb').Table('Orders')
try:
response = table.put_item(
Item={
'order_id': order_data['order_id'],
'timestamp': order_data['timestamp'],
'user_id': order_data['user_id'],
'items': order_data['items'],
'total': order_data['total'],
'region': boto3.session.Session().region_name,
'version': 1
},
ConditionExpression='attribute_not_exists(order_id) OR version < :new_version',
ExpressionAttributeValues={
':new_version': 1
}
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
print("Conflict detected - order already exists with newer version")
raise
## Route 53 configuration for geolocation routing
import json
route53_config = {
"HostedZoneId": "Z1234567890ABC",
"Changes": [
{
"Action": "CREATE",
"ResourceRecordSet": {
"Name": "api.example.com",
"Type": "A",
"SetIdentifier": "US-East",
"GeoLocation": {"ContinentCode": "NA"},
"AliasTarget": {
"HostedZoneId": "Z35SXDOTRQ7X7K",
"DNSName": "us-east-alb.amazonaws.com",
"EvaluateTargetHealth": True
}
}
},
{
"Action": "CREATE",
"ResourceRecordSet": {
"Name": "api.example.com",
"Type": "A",
"SetIdentifier": "EU-West",
"GeoLocation": {"ContinentCode": "EU"},
"AliasTarget": {
"HostedZoneId": "Z32O12XQLNTSW2",
"DNSName": "eu-west-alb.amazonaws.com",
"EvaluateTargetHealth": True
}
}
}
]
}
Performance Results:
- Latency: 15-45ms (99th percentile)
- Availability: 99.995% (exceeds SLA)
- Replication lag: <1 second
Example 3: Cost-Optimized ML Training with Spot Instances ๐ฐ
Scenario: Train deep learning models with 70% cost savings using EC2 Spot Instances and SageMaker Managed Spot Training.
import boto3
import sagemaker
from sagemaker.tensorflow import TensorFlow
sess = sagemaker.Session()
role = 'arn:aws:iam::123456789012:role/SageMakerRole'
## Configure spot training
estimator = TensorFlow(
entry_point='train.py',
role=role,
instance_count=4,
instance_type='ml.p3.2xlarge',
framework_version='2.11',
py_version='py39',
# Enable managed spot training
use_spot_instances=True,
max_run=86400, # 24 hours max
max_wait=90000, # Wait up to 25 hours (includes spot delays)
# Checkpointing for interruption recovery
checkpoint_s3_uri='s3://my-bucket/checkpoints',
checkpoint_local_path='/opt/ml/checkpoints',
# Hyperparameters
hyperparameters={
'epochs': 100,
'batch_size': 64,
'learning_rate': 0.001
}
)
## Training script with checkpointing
train_script = '''
import tensorflow as tf
import os
checkpoint_dir = '/opt/ml/checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)
## Load checkpoint if exists
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
print(f"Resuming from checkpoint: {latest_checkpoint}")
model = tf.keras.models.load_model(latest_checkpoint)
initial_epoch = int(latest_checkpoint.split('-')[-1])
else:
model = create_model()
initial_epoch = 0
## Configure checkpointing callback
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=os.path.join(checkpoint_dir, 'model-{epoch:03d}'),
save_freq='epoch',
save_best_only=False
)
## Train with interruption handling
model.fit(
train_data,
validation_data=val_data,
epochs=100,
initial_epoch=initial_epoch,
callbacks=[checkpoint_callback]
)
'''
## Start training job
estimator.fit({
'training': 's3://my-bucket/training-data',
'validation': 's3://my-bucket/validation-data'
})
## Monitor spot savings
import boto3
cloudwatch = boto3.client('cloudwatch')
def get_spot_savings(training_job_name):
sagemaker_client = boto3.client('sagemaker')
response = sagemaker_client.describe_training_job(
TrainingJobName=training_job_name
)
billable_seconds = response['BillableTimeInSeconds']
training_seconds = response['TrainingTimeInSeconds']
# Spot saves ~70% on compute
on_demand_cost = billable_seconds * 3.06 / 3600 # $3.06/hour for ml.p3.2xlarge
spot_cost = on_demand_cost * 0.3 # 70% savings
print(f"Training time: {training_seconds/3600:.2f} hours")
print(f"On-demand cost: ${on_demand_cost:.2f}")
print(f"Spot cost: ${spot_cost:.2f}")
print(f"Savings: ${on_demand_cost - spot_cost:.2f} ({70}%)")
return spot_cost
Best Practices for Spot Training:
- Checkpoint frequently (every epoch minimum)
- Use max_wait > max_run to handle spot interruptions
- Choose flexible instance types (multiple spot pools)
- Monitor spot availability in target AZs
Example 4: Serverless Video Processing with Step Functions ๐ฌ
Scenario: Process uploaded videos through transcoding, thumbnail generation, and content moderation pipeline.
import boto3
import json
stepfunctions = boto3.client('stepfunctions')
## State machine definition
state_machine_def = {
"Comment": "Video processing pipeline",
"StartAt": "ValidateVideo",
"States": {
"ValidateVideo": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:ValidateVideo",
"Next": "ParallelProcessing",
"Catch": [{
"ErrorEquals": ["InvalidVideoError"],
"Next": "NotifyFailure"
}]
},
"ParallelProcessing": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "TranscodeVideo",
"States": {
"TranscodeVideo": {
"Type": "Task",
"Resource": "arn:aws:states:::elastictranscoder:createJob.sync",
"Parameters": {
"PipelineId": "1234567890123-abcdef",
"Input": {"Key.$": "$.video_key"},
"Outputs": [
{"Key": "output.mp4", "PresetId": "1351620000001-000010"}
]
},
"End": True
}
}
},
{
"StartAt": "GenerateThumbnail",
"States": {
"GenerateThumbnail": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:GenerateThumbnail",
"End": True
}
}
},
{
"StartAt": "ModerateContent",
"States": {
"ModerateContent": {
"Type": "Task",
"Resource": "arn:aws:states:::rekognition:startContentModeration.sync",
"Parameters": {
"Video": {
"S3Object": {
"Bucket": "my-videos",
"Name.$": "$.video_key"
}
}
},
"End": True
}
}
}
],
"Next": "AggregateResults"
},
"AggregateResults": {
"Type": "Task",
"Resource": "arn:aws:lambda:us-east-1:123456789012:function:AggregateResults",
"Next": "UpdateDatabase"
},
"UpdateDatabase": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName": "VideoProcessingResults",
"Item": {
"video_id": {"S.$": "$.video_id"},
"status": {"S": "completed"},
"results": {"S.$": "$.aggregated_results"}
}
},
"Next": "NotifySuccess"
},
"NotifySuccess": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:123456789012:VideoProcessingComplete",
"Message.$": "$.video_id"
},
"End": True
},
"NotifyFailure": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:123456789012:VideoProcessingFailed",
"Message.$": "$.error"
},
"End": True
}
}
}
## Create state machine
response = stepfunctions.create_state_machine(
name='VideoProcessingPipeline',
definition=json.dumps(state_machine_def),
roleArn='arn:aws:iam::123456789012:role/StepFunctionsRole'
)
print(f"State machine created: {response['stateMachineArn']}")
## Lambda function for thumbnail generation
thumbnail_function = '''
import boto3
import subprocess
import os
s3 = boto3.client('s3')
def lambda_handler(event, context):
bucket = event['bucket']
video_key = event['video_key']
# Download video
input_path = '/tmp/input.mp4'
s3.download_file(bucket, video_key, input_path)
# Generate thumbnail at 5 seconds
output_path = '/tmp/thumbnail.jpg'
subprocess.run([
'ffmpeg',
'-i', input_path,
'-ss', '00:00:05',
'-vframes', '1',
'-vf', 'scale=320:240',
output_path
])
# Upload thumbnail
thumbnail_key = video_key.replace('.mp4', '_thumb.jpg')
s3.upload_file(output_path, bucket, thumbnail_key)
return {
'thumbnail_key': thumbnail_key,
'thumbnail_url': f"https://{bucket}.s3.amazonaws.com/{thumbnail_key}"
}
'''
Why Step Functions:
- Visual workflows make complex pipelines understandable
- Built-in retry and error handling
- Service integrations reduce custom code
- Parallel execution speeds up processing
- State persistence enables long-running workflows
Common Mistakes
โ Mistake 1: Not Handling Lambda Cold Starts
Problem:
import tensorflow as tf # Heavy import
import numpy as np
model = tf.keras.models.load_model('s3://bucket/model.h5') # Loads on every invocation!
def lambda_handler(event, context):
prediction = model.predict(event['data'])
return prediction
Solution:
import tensorflow as tf
import numpy as np
import boto3
import os
## Load model once (outside handler)
MODEL_PATH = '/tmp/model.h5'
if not os.path.exists(MODEL_PATH):
s3 = boto3.client('s3')
s3.download_file('my-bucket', 'model.h5', MODEL_PATH)
model = tf.keras.models.load_model(MODEL_PATH)
def lambda_handler(event, context):
# Model already loaded, fast execution
prediction = model.predict(event['data'])
return prediction
๐ก Additional tip: Use provisioned concurrency for predictable latency on critical endpoints.
โ Mistake 2: Ignoring DynamoDB Partition Key Design
Problem:
## Bad: All writes go to same partition (user_id as partition key)
table.put_item(
Item={
'user_id': 'user123', # Partition key
'timestamp': 1234567890, # Sort key
'action': 'login'
}
)
Solution:
import hashlib
import time
## Good: Distribute writes across partitions
def get_partition_key(user_id, shard_count=10):
# Hash user_id and mod by shard count
hash_val = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard = hash_val % shard_count
return f"{user_id}#{shard}"
table.put_item(
Item={
'partition_key': get_partition_key('user123'), # Distributed
'timestamp': int(time.time()),
'user_id': 'user123', # GSI partition key for queries
'action': 'login'
}
)
โ Mistake 3: Not Using SageMaker Batch Transform for Large Jobs
Problem: Using real-time endpoints for batch predictions wastes money.
Solution:
import sagemaker
transformer = sagemaker.transformer.Transformer(
model_name='my-model',
instance_count=5,
instance_type='ml.m5.xlarge',
output_path='s3://my-bucket/predictions',
strategy='MultiRecord', # Process multiple records per request
max_payload=10, # MB per request
max_concurrent_transforms=100
)
transformer.transform(
data='s3://my-bucket/input-data',
data_type='S3Prefix',
content_type='application/json',
split_type='Line'
)
transformer.wait()
print("Batch predictions complete!")
Cost comparison:
- Real-time endpoint (24/7): $734/month
- Batch transform (4 hours/day): $49/month (93% savings!)
โ Mistake 4: Forgetting to Set TTL on DynamoDB Items
Problem: Old data accumulates, increasing storage costs.
Solution:
import time
## Enable TTL on table
dynamodb = boto3.client('dynamodb')
dynamodb.update_time_to_live(
TableName='SessionData',
TimeToLiveSpecification={
'Enabled': True,
'AttributeName': 'expiration_time'
}
)
## Write items with expiration
table = boto3.resource('dynamodb').Table('SessionData')
table.put_item(
Item={
'session_id': 'abc123',
'user_id': 'user456',
'data': {'key': 'value'},
'expiration_time': int(time.time()) + 86400 # Expire in 24 hours
}
)
Key Takeaways
๐ Quick Reference Card
| Pattern | Use When | Key Services |
|---|---|---|
| Serverless AI | Variable workloads, rapid scaling | Lambda, SageMaker, Step Functions |
| Multi-Region Active-Passive | DR required, cost-sensitive | Route 53, S3 replication, RDS read replica |
| Multi-Region Active-Active | Global users, zero downtime | DynamoDB Global Tables, Aurora Global |
| Real-Time Inference | <50ms latency required | Lambda, ElastiCache, SageMaker Endpoints |
| Batch ML | Large datasets, flexible timing | SageMaker Batch Transform, Spot |
Cost Optimization Checklist:
- โ Right-size EC2 instances (use Compute Optimizer)
- โ Use Spot Instances for fault-tolerant workloads
- โ Implement S3 lifecycle policies
- โ Enable DynamoDB auto-scaling and on-demand
- โ Use Lambda instead of always-on EC2 where possible
- โ Set up Cost Anomaly Detection alerts
- โ Tag all resources for cost allocation
Security Best Practices:
- ๐ Encrypt everything (in transit and at rest)
- ๐ Use IAM roles, never access keys
- ๐ Enable CloudTrail and GuardDuty
- ๐ Implement least privilege access
- ๐ Use VPC endpoints to avoid internet traffic
- ๐ Rotate secrets with Secrets Manager
- ๐ Enable MFA for console access
๐ง Memory Devices
SCALES - Principles of Serverless Architecture:
- Stateless functions
- Cost-effective (pay per use)
- Auto-scaling
- Loosely coupled
- Event-driven
- Simple to deploy
RADAR - Multi-Region Strategy:
- Replication (data sync)
- Availability zones
- Disaster recovery plans
- Auto-failover
- Route 53 health checks
๐ Further Study
AWS Well-Architected Framework - https://aws.amazon.com/architecture/well-architected/
- Deep dive into reliability, security, performance efficiency, cost optimization, and operational excellence pillars
AWS Machine Learning Blog - https://aws.amazon.com/blogs/machine-learning/
- Real-world case studies, new service features, and architecture patterns
AWS Solutions Library - https://aws.amazon.com/solutions/
- Pre-built reference architectures and CloudFormation templates
๐ง Try This Next: Build a complete serverless ML pipeline: S3 โ Lambda โ SageMaker โ DynamoDB. Deploy it in two regions with Route 53 failover. Monitor costs with Cost Explorer and set up billing alarms. This hands-on project will solidify all concepts from this lesson!
Did you know? ๐ค Netflix processes over 450 billion events per day using AWS services, with architecture patterns similar to what you learned today. Their entire infrastructure runs on AWS with zero on-premises data centers!