Structured Logging Strategy
Learn event vs debug logs, correlation IDs, and when logs become your only source of truth
Structured Logging Strategy
Master structured logging with free flashcards and spaced repetition practice. This lesson covers log formats, context enrichment, and query-driven designβessential concepts for building observable production systems that enable rapid troubleshooting and root cause analysis.
Welcome to Structured Logging π»
Imagine debugging a production incident at 2 AM. You're frantically searching through millions of log lines trying to find what went wrong. You see messages like:
Error occurred in handler
Processing failed
Unexpected result
Now imagine instead seeing:
{
"timestamp": "2026-01-15T02:34:12.456Z",
"level": "error",
"service": "payment-api",
"trace_id": "7f8a9b2c-4d3e-5f6g-7h8i-9j0k1l2m3n4o",
"user_id": "usr_12345",
"operation": "charge_card",
"error_type": "gateway_timeout",
"gateway": "stripe",
"amount_cents": 4999,
"retry_count": 2,
"duration_ms": 5023,
"message": "Payment gateway timeout after 5s"
}
Which one helps you understand the problem faster? Structured logging transforms logs from narrative text into queryable data, enabling you to filter, aggregate, and analyze production behavior at scale.
Why Structured Logging Matters π―
Traditional unstructured logs are essentially strings meant for human reading. They work fine for small systems but break down at scale:
β Problems with unstructured logs:
- Parsing requires brittle regex patterns
- Difficult to filter across multiple dimensions
- Aggregation and metrics extraction are painful
- Context is often missing or inconsistent
- Query performance degrades with volume
β Benefits of structured logs:
- Machine-readable: JSON, key-value pairs, or protobuf formats
- Queryable: Filter by any field instantly
- Consistent: Schema enforcement across services
- Contextual: Rich metadata travels with every log
- Analyzable: Easy to aggregate, count, and visualize
UNSTRUCTURED vs STRUCTURED
βββββββββββββββββββββββββββββββββββββββββββ
β "User john@example.com failed login β Hard to query
β from IP 192.168.1.5 at 14:32" β users by country
βββββββββββββββββββββββββββββββββββββββββββ
β TRANSFORM β
βββββββββββββββββββββββββββββββββββββββββββ
β { β Easy: WHERE
β "event": "login_failed", β country="US"
β "user": "john@example.com", β AND event=
β "ip": "192.168.1.5", β "login_failed"
β "country": "US", β
β "timestamp": "2026-01-15T14:32:00Z" β
β } β
βββββββββββββββββββββββββββββββββββββββββββ
Core Components of Structured Logs π§
Every structured log entry should contain these fundamental elements:
1. Standard Fields (Always Include)
| Field | Purpose | Example |
|---|---|---|
| timestamp | When the event occurred | "2026-01-15T14:32:00.123Z" |
| level | Severity (debug/info/warn/error) | "error" |
| service | Which service emitted this | "payment-api" |
| message | Human-readable summary | "Payment processed successfully" |
2. Context Fields (Trace Through System)
| Field | Purpose | Example |
|---|---|---|
| trace_id | Distributed tracing identifier | "7f8a9b2c-4d3e-5f6g-7h8i" |
| span_id | Individual operation within trace | "9j0k1l2m3n4o" |
| user_id | Who triggered this | "usr_12345" |
| session_id | User session | "sess_abc123" |
| request_id | Unique per HTTP request | "req_xyz789" |
3. Domain-Specific Fields (Business Context)
These vary by operation but provide the "why" behind your logs:
{
"order_id": "ord_98765",
"product_sku": "LAPTOP-15-PRO",
"quantity": 2,
"total_amount_cents": 299800,
"payment_method": "credit_card",
"shipping_country": "US",
"warehouse": "warehouse-east-1"
}
π‘ Tip: Choose field names that match your mental model when investigating issues. If you'd ask "Which warehouse was this order shipped from?" then include warehouse as a field.
Log Levels: When to Use Each π
Choosing the right level is critical for signal-to-noise ratio:
| Level | Use Case | Volume | Example |
|---|---|---|---|
| DEBUG | Development troubleshooting | Very High | "Entering function calculateDiscount with cartValue=150" |
| INFO | Normal operations, state changes | High | "Order ord_123 transitioned to 'shipped' status" |
| WARN | Recoverable issues, degraded state | Medium | "Payment gateway latency elevated: 3.2s (threshold: 2s)" |
| ERROR | Failed operations requiring attention | Low | "Failed to charge card: insufficient_funds" |
Rule of thumb: If something happens thousands of times per second, it's INFO or DEBUG. If it should wake someone up at night, it's ERROR.
β οΈ Common mistake: Logging every function entry/exit at INFO level. This creates noise that drowns out important events. Reserve INFO for business events, not implementation details.
Context Enrichment: The Secret Sauce π¨
The power of structured logging comes from context propagationβautomatically attaching relevant metadata to every log without manual effort.
Method 1: Logger Initialization with Base Context
## Create a logger with default context
logger = Logger(
service="payment-api",
version="1.2.3",
environment="production",
region="us-east-1"
)
## Every log from this logger includes base context automatically
logger.info("Service started", port=8080)
## Output: {"service":"payment-api","version":"1.2.3",...,"message":"Service started","port":8080}
Method 2: Request-Scoped Context (Middleware)
// Express middleware example
app.use((req, res, next) => {
// Create child logger with request context
req.logger = logger.child({
trace_id: req.headers['x-trace-id'],
request_id: generateRequestId(),
user_id: req.user?.id,
ip: req.ip,
user_agent: req.headers['user-agent']
});
next();
});
app.post('/api/orders', (req, res) => {
// All logs automatically include request context
req.logger.info('Creating order', {
product_count: req.body.items.length
});
// Output includes trace_id, request_id, user_id automatically
});
Method 3: Contextual Wrappers
// Go example using context.Context
ctx = WithLogFields(ctx, LogFields{
"order_id": order.ID,
"customer_id": order.CustomerID,
})
// Pass context through call chain
result, err := processPayment(ctx, order)
if err != nil {
// Log automatically includes order_id and customer_id
LogError(ctx, "Payment failed", "error", err)
}
CONTEXT PROPAGATION FLOW
HTTP Request
β
β
Middleware
[Adds: trace_id, user_id, session_id]
β
β
Controller
[Adds: order_id, operation="create_order"]
β
β
Service Layer
[Adds: payment_method, amount]
β
β
Database Call
[Adds: query_duration_ms, rows_affected]
β
β
π Final Log Entry
{
"trace_id": "abc123", β From middleware
"user_id": "usr_456", β From middleware
"order_id": "ord_789", β From controller
"operation": "create_order", β From controller
"payment_method": "card", β From service
"amount": 4999, β From service
"query_duration_ms": 45, β From database
"message": "Order created"
}
π‘ Best Practice: Use context objects (like Go's context.Context or Python's contextvars) to automatically propagate fields through your call stack. This eliminates manual field passing.
Query-Driven Design π
The best logging strategy is one designed around the questions you'll ask during incidents. Work backwards from your queries:
Example 1: Performance Investigation
Question: "Which API endpoints are slowest for users in Europe?"
Required fields:
{
"endpoint": "/api/v1/search",
"duration_ms": 1543,
"region": "eu-west-1",
"user_country": "DE"
}
Query:
SELECT endpoint, p95(duration_ms)
FROM logs
WHERE user_country IN ('DE', 'FR', 'GB')
GROUP BY endpoint
ORDER BY p95(duration_ms) DESC
Example 2: Error Correlation
Question: "Are payment failures correlated with specific card types?"
Required fields:
{
"event": "payment_failed",
"error_code": "card_declined",
"card_brand": "visa",
"card_country": "US",
"decline_reason": "insufficient_funds"
}
Query:
SELECT card_brand, decline_reason, COUNT(*) as failures
FROM logs
WHERE event = 'payment_failed'
AND timestamp > NOW() - INTERVAL '1 hour'
GROUP BY card_brand, decline_reason
ORDER BY failures DESC
Example 3: User Journey Reconstruction
Question: "What did user X do before encountering the error?"
Required fields:
{
"trace_id": "abc123",
"user_id": "usr_456",
"event": "page_view",
"page": "/checkout",
"timestamp": "2026-01-15T14:32:00.123Z"
}
Query:
SELECT timestamp, service, event, page, message
FROM logs
WHERE user_id = 'usr_456'
AND timestamp BETWEEN '2026-01-15T14:00:00' AND '2026-01-15T15:00:00'
ORDER BY timestamp ASC
π§ Mental Model: Every log field is a potential query dimension. If you can't imagine filtering or grouping by a field, consider whether you need it.
Practical Implementation Examples π οΈ
Example 1: Python with structlog
import structlog
from datetime import datetime
## Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
def process_payment(order_id, amount_cents, user_id):
# Bind context that persists for all logs in this function
log = logger.bind(
order_id=order_id,
user_id=user_id,
operation="process_payment"
)
log.info("payment_started", amount_cents=amount_cents)
try:
result = charge_card(amount_cents)
log.info(
"payment_completed",
amount_cents=amount_cents,
transaction_id=result.transaction_id,
duration_ms=result.duration_ms
)
return result
except PaymentError as e:
log.error(
"payment_failed",
amount_cents=amount_cents,
error_code=e.code,
error_message=str(e),
retry_count=e.retry_count
)
raise
Output:
{
"event": "payment_started",
"timestamp": "2026-01-15T14:32:00.123Z",
"level": "info",
"order_id": "ord_789",
"user_id": "usr_456",
"operation": "process_payment",
"amount_cents": 4999
}
Example 2: Go with zap
import (
"go.uber.org/zap"
"go.uber.org/zap"
)
func ProcessOrder(ctx context.Context, order Order) error {
logger, _ := zap.NewProduction()
defer logger.Sync()
// Create child logger with order context
orderLogger := logger.With(
zap.String("order_id", order.ID),
zap.String("user_id", order.UserID),
zap.String("trace_id", GetTraceID(ctx)),
)
orderLogger.Info("order_processing_started",
zap.Int("item_count", len(order.Items)),
zap.Int64("total_cents", order.TotalCents),
)
start := time.Now()
err := validateInventory(ctx, order)
duration := time.Since(start)
if err != nil {
orderLogger.Error("inventory_validation_failed",
zap.Error(err),
zap.Duration("duration_ms", duration),
zap.Strings("out_of_stock_skus", err.SKUs),
)
return err
}
orderLogger.Info("inventory_validated",
zap.Duration("duration_ms", duration),
)
return nil
}
Example 3: Node.js with pino
const pino = require('pino');
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
formatters: {
level: (label) => ({ level: label })
},
timestamp: () => `,"timestamp":"${new Date().toISOString()}"`
});
class OrderService {
constructor(baseLogger) {
this.logger = baseLogger.child({ service: 'order-service' });
}
async createOrder(userId, items) {
const orderId = generateOrderId();
const orderLogger = this.logger.child({
order_id: orderId,
user_id: userId,
operation: 'create_order'
});
orderLogger.info({
item_count: items.length,
total_cents: calculateTotal(items)
}, 'Order creation started');
try {
const order = await this.db.orders.insert({
id: orderId,
user_id: userId,
items: items
});
orderLogger.info({
status: 'created',
db_duration_ms: order.queryDuration
}, 'Order created successfully');
return order;
} catch (error) {
orderLogger.error({
error_type: error.constructor.name,
error_message: error.message,
error_code: error.code,
stack: error.stack
}, 'Order creation failed');
throw error;
}
}
}
Example 4: Logging Database Queries
import time
from functools import wraps
def log_query(logger):
"""Decorator to log database queries with performance metrics"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
query_start = time.time()
query_name = func.__name__
try:
result = func(*args, **kwargs)
duration_ms = (time.time() - query_start) * 1000
logger.info(
"database_query_completed",
query=query_name,
duration_ms=round(duration_ms, 2),
rows_returned=len(result) if hasattr(result, '__len__') else None,
status="success"
)
# Warn on slow queries
if duration_ms > 1000:
logger.warn(
"slow_query_detected",
query=query_name,
duration_ms=round(duration_ms, 2),
threshold_ms=1000
)
return result
except Exception as e:
duration_ms = (time.time() - query_start) * 1000
logger.error(
"database_query_failed",
query=query_name,
duration_ms=round(duration_ms, 2),
error_type=type(e).__name__,
error_message=str(e),
status="error"
)
raise
return wrapper
return decorator
## Usage
@log_query(logger)
def get_orders_by_user(user_id):
return db.execute(
"SELECT * FROM orders WHERE user_id = ?",
(user_id,)
)
Common Mistakes to Avoid β οΈ
1. Logging Sensitive Data π
β NEVER log:
{
"password": "mypassword123",
"credit_card": "4532-1234-5678-9012",
"ssn": "123-45-6789",
"api_key": "sk_live_abc123"
}
β Instead:
{
"password_length": 13,
"card_last_4": "9012",
"card_brand": "visa",
"ssn_provided": true,
"api_key_prefix": "sk_live_***"
}
2. Inconsistent Field Names
β Inconsistent:
logger.info({ userId: "123" }); // camelCase
logger.info({ user_id: "123" }); // snake_case
logger.info({ UserID: "123" }); // PascalCase
logger.info({ userid: "123" }); // lowercase
β Pick one convention and enforce it:
// Choose snake_case and stick to it
logger.info({ user_id: "123" });
logger.info({ order_id: "456" });
logger.info({ trace_id: "789" });
3. Logging Without Context
β Useless:
logger.error("Payment failed")
You can't answer: Which user? Which order? Which payment gateway? What error?
β Actionable:
logger.error(
"payment_failed",
user_id=user.id,
order_id=order.id,
amount_cents=order.total,
gateway="stripe",
error_code=error.code,
error_message=error.message,
decline_reason=error.decline_code
)
4. Logging Too Much or Too Little
β Debug spam in production:
def calculate_discount(cart):
logger.debug(f"Entering calculate_discount") # Noise!
logger.debug(f"Cart has {len(cart.items)} items") # Noise!
for item in cart.items:
logger.debug(f"Processing item {item.id}") # Noise!
This creates millions of useless logs.
β Log meaningful events:
def calculate_discount(cart):
# Only log if something interesting happens
if cart.total > 100000: # $1000+
logger.info(
"high_value_cart_discount_applied",
user_id=cart.user_id,
total_cents=cart.total,
discount_percent=discount_rate
)
5. Missing Duration Metrics
β What happened:
logger.info("API call completed")
β How long it took:
start = time.time()
response = api_client.call()
duration_ms = (time.time() - start) * 1000
logger.info(
"api_call_completed",
endpoint=endpoint,
status_code=response.status_code,
duration_ms=round(duration_ms, 2),
retry_count=retry_count
)
Duration is critical for performance analysis!
6. String Concatenation Instead of Fields
β Can't query:
logger.info(f"User {user_id} purchased {product_name} for ${amount}")
You can't easily filter "all purchases over $100" because the amount is buried in a string.
β Queryable fields:
logger.info(
"purchase_completed",
user_id=user_id,
product_name=product_name,
amount_cents=amount * 100,
message=f"User {user_id} purchased {product_name}"
)
7. Not Using Standard Field Names
Use industry conventions when they exist:
| Concept | Standard Field | Not This |
|---|---|---|
| HTTP status | http.status_code | httpStatus |
| Request method | http.method | verb |
| Duration | duration_ms | time, elapsed |
| Error type | error.type | errorKind |
Using standards makes it easier to integrate with observability platforms.
Schema Design and Evolution π
Define Core Schema
Document your standard fields so all teams use them consistently:
## log-schema.yaml
standard_fields:
timestamp:
type: ISO8601
required: true
description: "When the event occurred"
level:
type: enum [debug, info, warn, error]
required: true
service:
type: string
required: true
description: "Service name from service registry"
trace_id:
type: string (UUID)
required: false
description: "Distributed trace identifier"
user_id:
type: string
required: false
description: "Authenticated user ID, null for anonymous"
event_schemas:
payment_processed:
fields:
order_id: string (required)
amount_cents: integer (required)
payment_method: enum [card, paypal, bank_transfer]
gateway: string (required)
transaction_id: string (required)
duration_ms: float (required)
order_created:
fields:
order_id: string (required)
user_id: string (required)
item_count: integer (required)
total_cents: integer (required)
shipping_country: string (ISO 3166-1 alpha-2)
Schema Validation
from jsonschema import validate
import json
## Load schema
with open('log-schema.json') as f:
schema = json.load(f)
class ValidatedLogger:
def __init__(self, logger, schema):
self.logger = logger
self.schema = schema
def info(self, event, **fields):
log_entry = {"event": event, **fields}
# Validate against schema
try:
validate(instance=log_entry, schema=self.schema)
except ValidationError as e:
# Log the validation error itself!
self.logger.error(
"log_schema_validation_failed",
event=event,
validation_error=str(e)
)
return
self.logger.info(event, **fields)
Schema Evolution Best Practices
- Always add, never remove: Old logs with the old schema still exist
- Make new fields optional: Don't break existing loggers
- Version your events:
payment_processed_v2if you need breaking changes - Document deprecations: Give teams time to migrate
SCHEMA EVOLUTION
βββββββββββββββββββββββββββββββββββββββββββ
β V1: payment_processed β
β - order_id β
β - amount_cents β
β - gateway β
βββββββββββββββββββββββββββββββββββββββββββ
β
β ADD FIELDS (safe)
βββββββββββββββββββββββββββββββββββββββββββ
β V1.1: payment_processed β
β - order_id β
β - amount_cents β
β - gateway β
β - currency (NEW, optional) β
β - gateway_transaction_id (NEW, opt) β
βββββββββββββββββββββββββββββββββββββββββββ
β
β BREAKING CHANGE
βββββββββββββββββββββββββββββββββββββββββββ
β V2: payment_processed_v2 β
β - order_id β
β - amount (changed: now object) β
β { value: 4999, currency: "USD" } β
β - payment_gateway (renamed) β
βββββββββββββββββββββββββββββββββββββββββββ
Sampling and Cost Management π°
At scale, logging everything is expensive. Use strategic sampling:
1. Deterministic Sampling (by trace_id)
def should_sample_trace(trace_id, sample_rate=0.1):
"""Sample 10% of traces consistently"""
import hashlib
trace_hash = int(hashlib.md5(trace_id.encode()).hexdigest(), 16)
return (trace_hash % 100) < (sample_rate * 100)
if should_sample_trace(trace_id) or log_level >= INFO:
logger.debug("detailed_operation_info", ...)
2. Adaptive Sampling (sample errors more)
def get_sample_rate(level):
return {
'debug': 0.01, # Sample 1% of debug logs
'info': 0.1, # Sample 10% of info logs
'warn': 0.5, # Sample 50% of warnings
'error': 1.0 # Sample 100% of errors
}[level]
3. Rate Limiting (prevent log floods)
from collections import defaultdict
import time
class RateLimitedLogger:
def __init__(self, logger, max_per_minute=100):
self.logger = logger
self.max_per_minute = max_per_minute
self.counts = defaultdict(list)
def info(self, event, **fields):
now = time.time()
# Clean old timestamps
self.counts[event] = [
t for t in self.counts[event]
if now - t < 60
]
if len(self.counts[event]) < self.max_per_minute:
self.counts[event].append(now)
self.logger.info(event, **fields)
elif len(self.counts[event]) == self.max_per_minute:
# Log once that we're rate limiting
self.logger.warn(
"log_rate_limit_exceeded",
event=event,
limit=self.max_per_minute
)
self.counts[event].append(now)
π‘ Cost-saving tip: Send high-volume, low-value logs (like DEBUG) to cheaper cold storage, and only send ERROR/WARN to your real-time analytics platform.
Integration with Observability Stack π
Structured logs are most powerful when integrated with the full observability ecosystem:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β OBSERVABILITY TRIAD β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β π METRICS π LOGS π TRACES β
β (What's broken?) (Why broken?) (Where broken?) β
β β
β - Request rate - Error msgs - Request path β
β - Error rate - Stack traces - Span timing β
β - Latency p95 - Context data - Dependencies β
β β
β Counter: 401 Log: "Invalid Trace: shows β
β errors in last API key for which service β
β 5 min user X" called which β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β β
βββββββββββββββββββββΌββββββββββββββββββ
β
π CORRELATED BY trace_id
Example incident flow:
1. π Metric alert: "Payment error rate > 5%"
2. π Trace: "Payment service calling gateway timeout"
3. π Logs: "Gateway returned 504, circuit breaker opened"
Correlation Example
## Emit all three signals with same trace_id
def process_payment(trace_id, order_id, amount):
# Start trace span
with tracer.start_span("payment.process", trace_id=trace_id) as span:
span.set_attribute("order_id", order_id)
span.set_attribute("amount_cents", amount)
# Log with trace context
logger.info(
"payment_started",
trace_id=trace_id,
order_id=order_id,
amount_cents=amount
)
# Emit metric
metrics.counter("payments.started", tags={"currency": "USD"}).inc()
try:
result = gateway.charge(amount)
# Success: log, trace, metric all correlated
logger.info(
"payment_completed",
trace_id=trace_id,
order_id=order_id,
transaction_id=result.id,
duration_ms=result.duration
)
span.set_status("ok")
metrics.counter("payments.success").inc()
return result
except GatewayError as e:
# Error: all signals show the failure
logger.error(
"payment_failed",
trace_id=trace_id,
order_id=order_id,
error_code=e.code,
error_message=str(e)
)
span.set_status("error")
span.record_exception(e)
metrics.counter("payments.failed", tags={"error": e.code}).inc()
raise
Now when you get an alert from metrics, you can:
- Find the trace_id from the metric dimensions
- Pull up the trace to see the request flow
- Query logs for that trace_id to get detailed context
Key Takeaways π―
π Structured Logging Quick Reference
| Principle | Implementation |
|---|---|
| Machine-readable format | Use JSON, not plain text |
| Consistent schema | Define and enforce standard fields |
| Rich context | Include trace_id, user_id, request_id |
| Query-driven design | Add fields you'll actually search by |
| Appropriate levels | ERROR for failures, INFO for events, DEBUG sparingly |
| Include duration | Always log timing for operations |
| Never log secrets | Mask sensitive data (PII, credentials) |
| Context propagation | Use middleware/context objects for automatic enrichment |
| Correlation | Link logs, traces, and metrics with trace_id |
| Cost management | Sample high-volume logs, rate limit floods |
Essential Fields Checklist:
- β timestamp (ISO8601)
- β level (debug/info/warn/error)
- β service (which system)
- β trace_id (correlation)
- β message (human summary)
- β event (machine-readable event type)
- β duration_ms (for operations)
- β error_type (for failures)
Remember: The goal isn't to log everythingβit's to log the right things in a queryable format so you can quickly answer questions during incidents.
π Further Study
- Structlog Documentation - Python structured logging best practices
- Uber's Zap Logger Design - High-performance Go logging architecture
- OpenTelemetry Logging Spec - Industry standard for log correlation with traces