You are viewing a preview of this lesson. Sign in to start learning
Back to Production Observability: From Signals to Root Cause (2026)

Async Boundary Patterns

Handle context propagation across HTTP, message queues, background jobs, and batch processes

Async Boundary Patterns

Master async boundary patterns with free flashcards and spaced repetition practice. This lesson covers context propagation across asynchronous operations, thread pool boundaries, and message queue patternsβ€”essential concepts for maintaining observability in distributed systems. Understanding how to preserve trace context when execution crosses async boundaries is critical for root cause analysis.

Welcome to Async Boundary Patterns πŸ’»

Welcome to one of the most challenging aspects of distributed tracing! When your application spawns threads, queues messages, or delegates work to async executors, trace context doesn't automatically follow. You'll learn the architectural patterns that ensure observability signals survive these transitions.

In modern cloud-native applications, async operations are everywhereβ€”from event-driven microservices to reactive programming models. Each async boundary represents a potential context break where your carefully propagated trace ID could vanish. This lesson equips you with battle-tested patterns to prevent those breaks.

Core Concepts: Understanding Async Boundaries πŸ”„

What is an Async Boundary?

An async boundary occurs whenever execution control transfers from one execution context to another. This includes:

  • Thread pool submissions 🧡 (ExecutorService, worker threads)
  • Message queue operations πŸ“¬ (Kafka, RabbitMQ, SQS)
  • Event loops ⚑ (Node.js, async/await)
  • Callback chains πŸ”— (promises, futures, reactive streams)
  • Scheduled tasks ⏰ (cron jobs, delayed execution)
Synchronous Flow (context preserved):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Request  β”‚
β”‚  Thread  │───→ Method A ───→ Method B ───→ Method C
β”‚          β”‚         ↓              ↓              ↓
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    [Trace ID: abc123 flows naturally]


Async Flow (context breaks without patterns):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Request  │───→ Submit Task ───X───→ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Thread  β”‚                          β”‚  Worker  β”‚
β”‚          β”‚                          β”‚  Thread  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                          β”‚          β”‚
  Trace ID: abc123                    β”‚ Trace ID: ??? ❌
                                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Why Context Gets Lost

Thread-local storage (the common mechanism for context propagation) doesn't automatically transfer when work moves to a different thread. Each thread has its own isolated storage, so context must be explicitly captured and restored.

πŸ’‘ Key Insight: Context propagation is a two-phase operationβ€”capture in the originating context, restore in the destination context.

The Three Pillars of Async Context Propagation

Pillar Description Example
Capture Extract context at async boundary entry point Snapshot trace ID before queue.send()
Serialize Encode context for transport across boundary Add trace headers to message
Restore Rehydrate context in destination execution environment Set thread-local from headers on receive

Pattern 1: Wrapper-Based Propagation πŸ“¦

The wrapper pattern decorates async tasks with context-aware wrappers that handle capture and restore automatically.

Implementation Strategy:

  1. Create a wrapper class that captures context at construction time
  2. Wrap the original task/runnable/callable
  3. Override execution method to restore context before delegating
  4. Clean up context after task completes
Wrapper Pattern Flow:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Originating Thread (Trace ID: abc123)         β”‚
β”‚                                                 β”‚
β”‚  Context ctx = getCurrentContext()  ──┐        β”‚
β”‚  Runnable wrapped = new               β”‚        β”‚
β”‚    ContextAwareRunnable(task, ctx) ───┼─┐      β”‚
β”‚  executor.submit(wrapped)              β”‚ β”‚      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”Όβ”€β”€β”€β”€β”€β”€β”˜
                                         β”‚ β”‚
                        Capture happens here
                                         ↓ ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Worker Thread (initially no context)           β”‚
β”‚                                                 β”‚
β”‚  wrapped.run() called:                          β”‚
β”‚    1. setContext(ctx) ←─────────────────┐      β”‚
β”‚    2. task.run()          Restore        β”‚      β”‚
β”‚    3. clearContext()                     β”‚      β”‚
β”‚       ↓                                  β”‚      β”‚
β”‚    [Trace ID: abc123 active! βœ“]         β”‚      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Advantages:

  • βœ… Explicit and easy to understand
  • βœ… Works with any executor framework
  • βœ… Can be applied selectively

Disadvantages:

  • ⚠️ Requires wrapping every async submission
  • ⚠️ Easy to forget in new code paths

Pattern 2: Instrumented Executor Services πŸ”§

Instead of wrapping every task, instrument the executor itself to automatically wrap submitted tasks.

Implementation Strategy:

  1. Create a custom executor that wraps a standard executor
  2. Override submit/execute methods to wrap tasks automatically
  3. All tasks submitted to this executor get context propagation for free
Instrumented Executor Architecture:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Application Code                       β”‚
β”‚                                          β”‚
β”‚   executor.submit(task)  ───────┐       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”˜
                                β”‚  β”‚
                                ↓  β”‚
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”
β”‚   ContextPropagatingExecutor     β”‚       β”‚
β”‚                                  β”‚       β”‚
β”‚   @Override                      β”‚       β”‚
β”‚   submit(task) {                 β”‚       β”‚
β”‚     ctx = capture()  β†β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚     wrapped = wrap(task, ctx)            β”‚
β”‚     return delegate.submit(wrapped)      β”‚
β”‚   }                    ↓                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                         β”‚
                         ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Underlying ThreadPoolExecutor          β”‚
β”‚                                          β”‚
β”‚   [Worker threads execute wrapped tasks] β”‚
β”‚   [Context automatically restored! βœ“]    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Java Example Concept:

public class TracingExecutorService implements ExecutorService {
    private final ExecutorService delegate;
    
    public Future<?> submit(Runnable task) {
        Context ctx = Context.current(); // Capture
        Runnable wrapped = () -> {
            try (Scope scope = ctx.makeCurrent()) { // Restore
                task.run();
            }
        };
        return delegate.submit(wrapped);
    }
}

Advantages:

  • βœ… Centralized instrumentation
  • βœ… Works automatically for all submitted tasks
  • βœ… Harder to bypass accidentally

Disadvantages:

  • ⚠️ Requires executor replacement throughout codebase
  • ⚠️ May conflict with existing custom executors

Pattern 3: Reactive Context Propagation 🌊

Reactive frameworks (RxJava, Project Reactor, Kotlin Flow) use operator chains where context must flow through transformations.

The Challenge:

Reactive Stream Problem:

Publisher stream = 
    repository.findAll()              // Thread A
        .map(this::transform)         // Thread B (scheduler)
        .filter(this::validate)       // Thread C (scheduler)
        .flatMap(this::enrich);       // Thread D (I/O pool)

Each operator may execute on different threads!
Context must survive ALL hops.

Solution: Reactor Context

Project Reactor provides Context that flows with the reactive pipeline:

Reactor Context Flow:

    subscriberContext(ctx β†’ ctx.put("traceId", "abc123"))
                    ↓
            [Context attached to subscription]
                    ↓
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚                               β”‚
    ↓                               ↓
  map()                          flatMap()
  [reads ctx]                    [reads ctx]
  TraceId: abc123 βœ“              TraceId: abc123 βœ“

Advantages:

  • βœ… Native to reactive paradigm
  • βœ… Immutable context prevents corruption
  • βœ… Automatic propagation through operators

Disadvantages:

  • ⚠️ Framework-specific (not portable)
  • ⚠️ Requires reactive programming model

Pattern 4: Message Queue Context Injection πŸ“¨

When crossing process boundaries via message queues, context must be serialized into message metadata.

The W3C Trace Context Standard:

The W3C Trace Context specification defines standard headers for distributed tracing:

  • traceparent: Version, trace ID, parent span ID, trace flags
  • tracestate: Vendor-specific context (optional)
Header Example Value Purpose
traceparent 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 Core trace info
tracestate vendor1=value1,vendor2=value2 Extended metadata

Implementation Pattern:

Message Queue Context Propagation:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Producer Service                               β”‚
β”‚                                                 β”‚
β”‚  Context ctx = getCurrentContext()              β”‚
β”‚  Message msg = new Message(payload)             β”‚
β”‚  msg.setHeader("traceparent",                   β”‚
β”‚                ctx.toTraceparent())  ──┐        β”‚
β”‚  queue.send(msg)                       β”‚        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”˜
                                         β”‚
                        Serialized to message headers
                                         β”‚
                                         ↓
              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
              β”‚   Message Queue (Kafka)      β”‚
              β”‚                              β”‚
              β”‚   Topic: orders              β”‚
              β”‚   Headers:                   β”‚
              β”‚     traceparent: 00-4bf...   β”‚
              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                             β”‚
                             ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Consumer Service                               β”‚
β”‚                                                 β”‚
β”‚  Message msg = queue.receive()                  β”‚
β”‚  String tp = msg.getHeader("traceparent")       β”‚
β”‚  Context ctx = Context.fromTraceparent(tp)      β”‚
β”‚  ctx.makeCurrent()  ←─────────┐                β”‚
β”‚  processMessage(msg)           β”‚                β”‚
β”‚         ↓              Restore context          β”‚
β”‚  [Trace ID flows to consumer! βœ“]               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ’‘ Pro Tip: Always inject context before sending and extract immediately upon receiving, before any business logic executes.

Pattern 5: Baggage for Cross-Service Metadata πŸŽ’

Baggage allows you to propagate arbitrary key-value pairs alongside trace context. Use it for:

  • User IDs for authorization decisions
  • Tenant IDs in multi-tenant systems
  • Feature flags that affect request handling
  • Session identifiers

⚠️ Warning: Baggage adds overhead to every propagation. Keep it small (< 1KB total).

Baggage Propagation:

  Service A                Service B                Service C
     β”‚                        β”‚                        β”‚
     β”‚  Request              β”‚                        β”‚
     β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β†’ β”‚                        β”‚
     β”‚  Headers:             β”‚  Request               β”‚
     β”‚   traceparent: ...    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β†’ β”‚
     β”‚   baggage:            β”‚   Headers:             β”‚
     β”‚    userId=12345,      β”‚    traceparent: ...    β”‚
     β”‚    tier=premium       β”‚    baggage:            β”‚
     β”‚                       β”‚     userId=12345,      β”‚
     β”‚                       β”‚     tier=premium       β”‚
     β”‚                       β”‚                        β”‚
     β”‚                       β”‚  Can read baggage      β”‚
     β”‚                       β”‚  without parsing logs! β”‚

Use Cases:

  1. Dynamic sampling: Sample 100% of premium user requests
  2. Conditional routing: Route canary deployments based on baggage flag
  3. Security context: Propagate authentication claims
  4. Business context: Track campaign IDs for attribution

Detailed Examples 🎯

Example 1: Thread Pool with Manual Wrapping

Scenario: You have a REST API endpoint that spawns background work in a thread pool. Without context propagation, the background work loses the trace ID.

Problem Code:

@GetMapping("/process")
public Response processRequest(@RequestParam String data) {
    // Current thread has trace context
    logger.info("Request received"); // βœ“ Trace ID logged
    
    executor.submit(() -> {
        // New thread, NO trace context!
        logger.info("Background work started"); // ❌ No trace ID!
        heavyComputation(data);
    });
    
    return Response.accepted();
}

Solution with Wrapper Pattern:

public class TraceContextRunnable implements Runnable {
    private final Runnable delegate;
    private final Context capturedContext;
    
    public TraceContextRunnable(Runnable delegate) {
        this.delegate = delegate;
        this.capturedContext = Context.current(); // Capture!
    }
    
    @Override
    public void run() {
        try (Scope scope = capturedContext.makeCurrent()) {
            // Context restored!
            delegate.run();
        }
    }
}

@GetMapping("/process")
public Response processRequest(@RequestParam String data) {
    logger.info("Request received"); // βœ“ Trace ID: abc123
    
    Runnable task = () -> {
        logger.info("Background work started"); // βœ“ Trace ID: abc123
        heavyComputation(data);
    };
    
    executor.submit(new TraceContextRunnable(task)); // Wrapped!
    
    return Response.accepted();
}

What Changed:

  • Context captured at wrapper construction time (in request thread)
  • makeCurrent() restores context in worker thread
  • try-with-resources ensures cleanup
  • Logs now show same trace ID across threads! πŸŽ‰

Example 2: Kafka Producer-Consumer Chain

Scenario: Order service publishes to Kafka, fulfillment service consumes. We need the trace to span both services.

Producer Side:

public void publishOrder(Order order) {
    Context ctx = Context.current();
    String traceParent = ctx.toTraceparent();
    
    ProducerRecord<String, Order> record = 
        new ProducerRecord<>("orders", order.getId(), order);
    
    // Inject context into Kafka headers
    record.headers()
        .add("traceparent", traceParent.getBytes());
    
    kafkaProducer.send(record);
    logger.info("Order published"); // Trace ID: xyz789
}

Consumer Side:

@KafkaListener(topics = "orders")
public void consumeOrder(ConsumerRecord<String, Order> record) {
    // Extract context from headers
    Header tpHeader = record.headers().lastHeader("traceparent");
    String traceParent = new String(tpHeader.value());
    
    Context ctx = Context.fromTraceparent(traceParent);
    
    try (Scope scope = ctx.makeCurrent()) {
        // Context restored in consumer!
        Order order = record.value();
        logger.info("Processing order"); // Same trace ID: xyz789 βœ“
        fulfillmentService.fulfill(order);
    }
}

Key Points:

  • Producer extracts traceparent from current context
  • Headers are the transport mechanism (metadata, not payload)
  • Consumer immediately restores context before processing
  • Both services log with same trace ID, creating unified trace
Unified Trace View in APM:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Trace ID: xyz789                               β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚ Span: POST /orders (Order Service)   β”‚      β”‚
β”‚  β”‚ Duration: 15ms                       β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚           β”‚                                     β”‚
β”‚           └─→ kafka.send                        β”‚
β”‚                     β”‚                           β”‚
β”‚                     ↓                           β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”‚
β”‚  β”‚ Span: kafka.consume (Fulfillment)    β”‚      β”‚
β”‚  β”‚ Duration: 250ms                      β”‚      β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚
β”‚           β”‚                                     β”‚
β”‚           └─→ fulfill (downstream calls)        β”‚
β”‚                                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

End-to-end visibility across async boundary! 🎯

Example 3: Project Reactor Context Propagation

Scenario: Reactive API endpoint that chains multiple async operations. Context must flow through the entire reactive pipeline.

Problem Code:

@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
    return userRepository.findById(id)  // DB call
        .flatMap(user -> {
            // Context lost here! No trace ID
            logger.info("Enriching user");
            return enrichmentService.enrich(user);
        })
        .map(user -> {
            // Still no context
            logger.info("Transforming user");
            return transform(user);
        });
}

Solution with Reactor Context:

@GetMapping("/users/{id}")
public Mono<User> getUser(@PathVariable String id) {
    Context ctx = Context.current();
    String traceId = ctx.getTraceId();
    
    return userRepository.findById(id)
        .flatMap(user -> {
            // Retrieve context from reactor context
            return Mono.deferContextual(reactorCtx -> {
                String tid = reactorCtx.get("traceId");
                logger.info("Enriching user, trace={}", tid);
                return enrichmentService.enrich(user);
            });
        })
        .map(user -> {
            logger.info("Transforming");
            return transform(user);
        })
        .contextWrite(reactorContext -> 
            reactorContext.put("traceId", traceId)
        ); // Inject context at subscription time
}

Better Approach with Hook:

Instead of manual injection, configure a Reactor hook to automatically propagate OpenTelemetry context:

@Configuration
public class TracingConfiguration {
    
    @PostConstruct
    public void configureReactorHooks() {
        Hooks.onEachOperator(
            OpenTelemetryReactorInstrumentation.create()
        );
    }
}

Now context propagates automatically through all reactive chains! πŸš€

Example 4: Scheduled Task Context Injection

Scenario: Cron job runs every hour but has no incoming request to provide context. How do we create traceable executions?

Problem: No parent context exists for scheduled tasks.

Solution: Create root span at task start:

@Scheduled(cron = "0 0 * * * *")
public void hourlyCleanup() {
    // No context exists yet
    
    Tracer tracer = GlobalOpenTelemetry.getTracer("scheduler");
    Span span = tracer.spanBuilder("hourly-cleanup")
        .setSpanKind(SpanKind.INTERNAL)
        .startSpan();
    
    try (Scope scope = span.makeCurrent()) {
        // Now we have a root context!
        logger.info("Starting cleanup"); // Trace ID present βœ“
        
        cleanupService.removeExpiredData();
        // All downstream calls inherit this context
        
        span.setStatus(StatusCode.OK);
    } catch (Exception e) {
        span.recordException(e);
        span.setStatus(StatusCode.ERROR);
        throw e;
    } finally {
        span.end();
    }
}

Key Techniques:

  • Create root span explicitly (no parent)
  • Wrap entire task execution in span scope
  • Record success/failure status
  • All spawned work inherits this root context
Scheduled Task Trace:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Trace ID: auto-generated-root              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”‚
β”‚  β”‚ Span: hourly-cleanup (ROOT)     β”‚       β”‚
β”‚  β”‚ Kind: INTERNAL                  β”‚       β”‚
β”‚  β”‚ Duration: 5.2s                  β”‚       β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚
β”‚       β”‚                                     β”‚
β”‚       β”œβ”€β†’ removeExpiredSessions (2.1s)     β”‚
β”‚       β”œβ”€β†’ cleanupTempFiles (1.8s)          β”‚
β”‚       └─→ compactDatabase (1.3s)           β”‚
β”‚                                             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Full observability for background jobs! πŸ“Š

Common Mistakes ⚠️

Mistake 1: Forgetting to Restore Context

❌ Wrong:

executor.submit(() -> {
    Context ctx = capturedContext; // Captured but NOT restored!
    doWork();
});

βœ… Right:

executor.submit(() -> {
    try (Scope scope = capturedContext.makeCurrent()) {
        doWork(); // Context active!
    }
});

Why it matters: Capturing without restoring means context exists in memory but isn't active on the thread. Instrumentation libraries won't see it.

Mistake 2: Capturing Context Too Late

❌ Wrong:

executor.submit(() -> {
    Context ctx = Context.current(); // Captures worker thread context (empty!)
    doWork();
});

βœ… Right:

Context ctx = Context.current(); // Capture BEFORE submitting
executor.submit(() -> {
    try (Scope scope = ctx.makeCurrent()) {
        doWork();
    }
});

Why it matters: Context must be captured in the originating thread before crossing the boundary.

Mistake 3: Not Cleaning Up Context

❌ Wrong:

context.makeCurrent();
doWork();
// Context left active! Memory leak potential

βœ… Right:

try (Scope scope = context.makeCurrent()) {
    doWork();
} // Scope auto-closes, context cleaned up

Why it matters: Threads are typically pooled and reused. Leftover context corrupts subsequent tasks on the same thread.

Mistake 4: Ignoring Message Ordering

In message queues, context injection must happen BEFORE send, and extraction must happen BEFORE processing. Don't inject context in a callback or listenerβ€”it's too late!

Mistake 5: Overusing Baggage

❌ Wrong: Adding large objects to baggage

baggage.set("userProfile", jsonSerializer.serialize(user)); // Huge!

βœ… Right: Only propagate IDs

baggage.set("userId", user.getId()); // Small

Why it matters: Baggage is transmitted with every single downstream call. Large baggage kills performance.

Key Takeaways πŸŽ“

  1. Async boundaries break context unless you explicitly propagate it through capture-serialize-restore patterns

  2. Choose the right pattern for your architecture:

    • Thread pools β†’ Wrapper or Instrumented Executor
    • Message queues β†’ Header injection with W3C Trace Context
    • Reactive streams β†’ Reactor Context or hooks
    • Scheduled tasks β†’ Root span creation
  3. The W3C Trace Context standard provides interoperability for propagating context across services and vendors

  4. Always capture context before the boundary and restore immediately on the other side

  5. Use try-with-resources for context scopes to ensure cleanup and prevent pollution

  6. Baggage enables cross-cutting concerns but must be kept small to avoid performance impact

  7. Instrumentation libraries can help but understanding the underlying patterns makes you effective when auto-instrumentation isn't available

πŸ“‹ Quick Reference Card

Boundary Type Pattern Capture Point Restore Point
Thread Pool Wrapper or Instrumented Executor Before submit() Start of run()
Message Queue Header Injection Before send() Start of consume()
Reactive Stream Reactor Context / Hooks At subscription Each operator
Scheduled Task Root Span Creation Task start Immediate
HTTP Call Header Injection Before request Server entry point

🧠 Memory Aid - "CCR": Capture (before boundary) β†’ Carry (serialize/transport) β†’ Restore (after boundary)

⚑ Pro Tips:

  • Use OpenTelemetry auto-instrumentation when possibleβ€”it handles most patterns automatically
  • Test async boundaries explicitly in integration tests by verifying trace ID continuity
  • Monitor for "orphaned" spans (spans without parents) as indicators of broken propagation
  • Document which executors/queues have context propagation enabled

Further Study πŸ“š

  1. W3C Trace Context Specification: https://www.w3.org/TR/trace-context/ - Official standard for distributed trace propagation

  2. OpenTelemetry Context Propagation: https://opentelemetry.io/docs/instrumentation/java/manual/#context-propagation - Practical guide with code examples

  3. Project Reactor Context: https://projectreactor.io/docs/core/release/reference/#context - Deep dive into reactive context propagation

You now have the patterns to ensure observability signals survive any async boundary. Practice implementing these patterns in your own services, and watch your distributed traces become complete! πŸš€