You are viewing a preview of this lesson. Sign in to start learning
Back to Agentic AI as a Part of Software Development

Prompt Chaining & Parallel Workflows

Build multi-step agent pipelines using sequential chains, routers, fan-out/fan-in, and ensemble patterns.

Last generated

Why Multi-Step Agent Pipelines Exist

Imagine handing a single employee a task that reads: "Research our top three competitors, summarize each one's pricing strategy, identify gaps in our own offering, draft a memo for the executive team, and flag any claims that need legal review." Even a skilled person would struggle to hold all of that in their head simultaneously and produce reliable output in one uninterrupted pass. They'd naturally break it down — research first, then summarize, then analyze, then write, then review. The same constraint applies to language models. When we ask a single LLM call to do too much at once, something gives: reasoning quality degrades, important intermediate steps get skipped, and there's no checkpoint where we can catch errors before they compound. This section explains why that happens mechanically, and why the answer — multi-step agent pipelines — is not just a workaround but a genuine architectural pattern worthy of first-class design thinking.

The Problem With Doing Everything in One Prompt

A single LLM call seems appealingly simple. One input, one output, done. The trouble is that real-world tasks have a structure that punishes simplicity. Three specific failure modes emerge reliably when complex tasks are crammed into a single prompt.

Failure Mode 1: Context Window Pressure

Context window pressure is the most obvious constraint. Every token of input — instructions, background documents, conversation history, examples — competes for space in the model's finite context. On a task like "read these five customer support transcripts, identify recurring complaints, and propose three product improvements," you quickly find that the transcripts alone consume most of the available space, leaving little room for nuanced reasoning in the output. The model isn't broken; it's just working with what it was given. Spreading that work across discrete steps — extract complaints from each transcript individually, then synthesize, then propose — means each step works with a focused, appropriately-sized context.

Failure Mode 2: Compounded Reasoning Errors

The second failure mode is subtler and more damaging: compounded reasoning errors. Language models are not perfect reasoners, and the longer the chain of logical steps required within a single response, the higher the probability that a small misstep early on will cascade. Think of it like mental arithmetic with many digits — each operation introduces a small chance of error, and errors propagate forward. When you decompose a task into explicit steps and validate the output of each step before passing it along, you contain the blast radius of any single mistake. A faulty classification in step two doesn't corrupt step five if step three includes a validation gate.

Failure Mode 3: No Intermediate Validation

The third failure mode is the absence of intermediate validation. With a single prompt, you get one shot. If the model misunderstands the task, interprets an ambiguous instruction in an unexpected way, or produces an output that's technically correct but wrong for downstream use, you discover this only at the very end — after the full generation is complete. There's no natural checkpoint. Pipelines, by contrast, surface outputs at each stage boundary, where they can be inspected, logged, routed, or rejected before the next stage begins.

💡 Real-World Example: A common pattern in production code review agents is to first extract the diff, then classify what kind of change it is (bug fix, refactor, new feature), then apply reviewers appropriate to that category, then synthesize feedback. Doing all of this in one prompt frequently produces generic feedback that misses the nuances a specialized reviewer would catch. Breaking it into stages — extract, classify, route, review, synthesize — lets each stage be optimized and tested independently.

Decomposition as a Design Principle

The move from a monolithic prompt to a decomposed pipeline is not just a technical convenience — it reflects a deeper design principle: each stage should do exactly one thing well, produce a clearly defined output, and be independently testable.

This maps directly to principles software engineers already respect. A function that does too many things is hard to test, hard to debug, and fragile when requirements change. The same is true of an LLM call that does too many things. When you decompose a task, you gain three concrete engineering properties:

🔧 Independently testable: You can write unit tests or evaluation harnesses for a single stage without needing the entire pipeline to be operational. A stage that extracts structured data from free text can be evaluated against a labeled dataset independently of whatever stage uses that data next.

🔧 Retryable: If a stage fails — due to a model error, a rate limit, or a downstream API being unavailable — you can retry just that stage without re-running the entire task from scratch. This is practically important for long-running pipelines that involve expensive operations like retrieval or tool calls.

🔧 Replaceable: Because each stage has a defined input and output contract, you can swap the implementation of any stage — changing the model, the prompt, or the underlying approach — without touching the stages around it. A classification stage that uses a small, fast model today can be upgraded to a more capable one tomorrow with zero changes to the stages it feeds.

Here's a simple illustration of what this looks like in code. Rather than one large prompt, consider three small functions, each with a typed input and output:

from dataclasses import dataclass
from typing import Literal

@dataclass
class ExtractedClaims:
    claims: list[str]

@dataclass
class ClassifiedClaims:
    factual: list[str]
    opinion: list[str]

@dataclass
class VerificationReport:
    verified: list[str]
    unverified: list[str]
    summary: str

def extract_claims(article_text: str) -> ExtractedClaims:
    """
    Stage 1: Ask the model to extract discrete factual claims
    from the article. Small, focused prompt.
    """
    # In practice, this calls your LLM with a focused extraction prompt.
    # Returning a typed dataclass enforces the contract with Stage 2.
    claims = call_llm_extract(article_text)  # returns list[str]
    return ExtractedClaims(claims=claims)

def classify_claims(extracted: ExtractedClaims) -> ClassifiedClaims:
    """
    Stage 2: Separate factual claims from opinions.
    Only receives the claim list — not the full article.
    """
    factual, opinion = call_llm_classify(extracted.claims)
    return ClassifiedClaims(factual=factual, opinion=opinion)

def verify_claims(classified: ClassifiedClaims) -> VerificationReport:
    """
    Stage 3: Verify only the factual claims against a knowledge source.
    Ignores opinions entirely — they were separated upstream.
    """
    verified, unverified = call_llm_verify(classified.factual)
    summary = call_llm_summarize(verified, unverified)
    return VerificationReport(
        verified=verified,
        unverified=unverified,
        summary=summary
    )

## The pipeline is explicit: each stage's output feeds the next.
def run_fact_check_pipeline(article_text: str) -> VerificationReport:
    claims = extract_claims(article_text)
    classified = classify_claims(claims)
    report = verify_claims(classified)
    return report

This code demonstrates the key principle: each function accepts a typed input and returns a typed output. classify_claims cannot accidentally receive the raw article text — it receives an ExtractedClaims object. That's the data contract enforcing stage boundaries. The call_llm_* helpers are placeholders representing your actual model calls; what matters here is the structure around them.

(Note: this example is simplified for clarity — a production implementation would add error handling, retries, and logging at each stage boundary, which we'll cover later in this lesson.)

🎯 Key Principle: A pipeline stage is defined by its contract, not its implementation. The input type, output type, and failure modes of a stage are what the rest of the system depends on — not whether the implementation uses GPT-4, Claude, a rule-based classifier, or a database lookup. This substitutability is what makes pipelines resilient.

Pipelines vs. Loops: A Critical Distinction

Before going further, it's worth being precise about one distinction that trips up many developers building their first agentic systems: the difference between a pipeline and a loop.

A loop repeats the same operation. If you call an LLM in a while loop, prompting it to refine its answer until a condition is met, you have a loop — the same operation, iterated. Loops are useful and appear frequently in agentic systems, but they are not pipelines.

Wrong thinking: "My agent keeps calling the LLM until it's satisfied, so it has a multi-step pipeline."

Correct thinking: "My pipeline has explicit stages with different responsibilities and typed data contracts between them. A loop is one tool I might use within a stage, but the pipeline is the structure connecting stages."

The practical difference matters because loops give you iteration but not decomposition. A loop that runs the same prompt ten times still suffers from all three failure modes described above — it just retries the same operation rather than advancing through a structured decomposition of the task. A pipeline advances state from one specialized stage to the next, transforming the representation of the problem as it goes.

## ❌ A loop: the same operation repeated
def loop_refine(draft: str, iterations: int = 3) -> str:
    for _ in range(iterations):
        draft = call_llm(f"Improve this draft: {draft}")
    return draft
    # Problem: no stage specialization, no typed contracts,
    # errors in iteration 1 propagate to iteration 2.

## ✅ A pipeline: different operations with explicit contracts
def pipeline_draft_and_review(topic: str) -> FinalDocument:
    outline = generate_outline(topic)       # Stage 1: structure
    draft = write_from_outline(outline)     # Stage 2: prose generation
    critique = critique_draft(draft)        # Stage 3: evaluation
    final = revise_with_critique(draft, critique)  # Stage 4: revision
    return final
    # Each stage has a different job. Errors at stage 3
    # don't corrupt stages 1 and 2.

🤔 Did you know? The distinction between iteration and staged decomposition has a direct analog in compiler design. A compiler doesn't run the same transformation over your source code repeatedly — it passes it through distinct phases (lexing, parsing, semantic analysis, code generation, optimization), each with a defined intermediate representation. The intermediate representations are the data contracts. Agentic pipelines are structured the same way, for the same reasons: different transformations require different representations.

Four Structural Patterns — A First Look

Not all pipelines are the same shape. The tasks you'll encounter in real software development call for different topologies, and having names for them lets you reason about designs quickly. This lesson covers four foundational patterns. Think of this as an initial map — we'll develop each one in detail in subsequent sections.

┌─────────────────────────────────────────────────────────────────┐
│                  FOUR PIPELINE PATTERNS                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. SEQUENTIAL CHAIN                                            │
│     [A] ──► [B] ──► [C] ──► [D]                                │
│     Output of each step feeds directly into the next.          │
│                                                                 │
│  2. ROUTER                                                      │
│     [Input] ──► [Router] ──► [Branch A]                        │
│                           └─► [Branch B]                        │
│                           └─► [Branch C]                        │
│     A classification step selects which path to execute.       │
│                                                                 │
│  3. FAN-OUT / FAN-IN                                            │
│                    ┌──► [Worker A] ──┐                         │
│     [Input] ──► [Split] ──► [Worker B] ──► [Merge] ──► Output  │
│                    └──► [Worker C] ──┘                         │
│     Work is distributed to parallel workers and aggregated.    │
│                                                                 │
│  4. ENSEMBLE                                                    │
│     [Input] ──► [Model/Prompt A] ──┐                           │
│             ──► [Model/Prompt B] ──► [Judge/Merge] ──► Output  │
│             ──► [Model/Prompt C] ──┘                           │
│     Multiple independent attempts are aggregated or judged.    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘
Pattern 1: Sequential Chain

The sequential chain is the baseline pattern: stage A produces output that becomes stage B's input, stage B feeds stage C, and so on. This is the pattern illustrated in the fact-checking example above. It's the right choice when each transformation depends on the previous one — when you genuinely need the output of step one before you can begin step two. Sequential chains make errors easy to localize (the bug is in whichever stage produces wrong output), and they're straightforward to test and monitor. The cost is latency: each step must complete before the next begins.

Pattern 2: Router

The router pattern introduces conditional branching. Rather than executing a fixed sequence for every input, a router stage classifies or analyzes the input and selects which downstream path to invoke. A customer support agent might classify an incoming message as a billing question, a technical issue, or a general inquiry — and route each to a different specialized handler. Routers reduce waste (you don't run expensive reasoning on simple cases) and improve quality (specialized handlers outperform generalist ones on their target domain).

Pattern 3: Fan-Out / Fan-In

Fan-out / fan-in addresses a different problem: tasks that can be parallelized. When a task consists of multiple independent sub-tasks — summarizing ten documents, translating a report into five languages, evaluating a codebase across three quality dimensions — you can distribute those sub-tasks to parallel workers (fan-out) and then aggregate the results (fan-in). This pattern is primarily a latency and throughput optimization, but it also enables the aggregation stage to do something a single-call approach cannot: it can compare, reconcile, or synthesize across independently produced results.

Pattern 4: Ensemble

The ensemble pattern may look similar to fan-out at first glance — multiple calls run in parallel — but the intent is different. In a fan-out, different workers handle different pieces of the same task. In an ensemble, multiple workers independently attempt the same task, often with different prompts, temperatures, or models. Their outputs are then aggregated by a judging or merging stage. The goal is not parallelism for speed but diversity for reliability: when it matters more to get the right answer than to get a fast one, sampling multiple independent attempts and selecting or synthesizing among them tends to outperform any single attempt. The tradeoff is cost: an ensemble that runs four independent calls costs roughly four times as much as a single call.

📋 Quick Reference Card:

🎯 Pattern 🔧 Core Structure 📚 Solves ⚠️ Tradeoff
🔗 Sequential Chain A → B → C Dependent transformations Latency adds up
🗂️ Router Classify → Branch Avoiding one-size-fits-all Router accuracy matters
🌐 Fan-Out/Fan-In Split → Parallel Workers → Merge Independent sub-tasks Aggregation complexity
🎲 Ensemble Same task × N → Judge High-stakes correctness Cost multiplies by N

🧠 Mnemonic: SRFE"Some Robots Find Ensembles" — Sequential, Router, Fan-out/Fan-in, Ensemble. This covers the primary structural vocabulary for this lesson. In practice, real pipelines often combine two or more of these patterns in a single flow — a sequential chain that contains a router step, for instance, or a fan-out whose workers each run a short sequential chain.

Why This Framing Matters for Software Architecture

It's tempting to treat pipelines as an implementation detail — something you bolt on when a single prompt stops working. The more productive framing is to treat the pipeline as a first-class architectural decision, made early, with the same deliberateness you'd bring to choosing a data model or an API boundary.

This matters because the choice of pipeline structure determines what becomes observable, what becomes testable, and what can be changed without full rewrites. A system built as a single large prompt is essentially a black box — you can observe inputs and outputs, but the intermediate reasoning is invisible and unreachable. A system built as a pipeline of typed stages is a white box: you can log, inspect, replay, and modify any stage independently.

In practice, teams that adopt the pipeline framing early find it much easier to answer the questions that come up in production: Which stage is producing bad outputs? Can we make this faster by parallelizing some steps? Can we make this cheaper by routing simple cases to a smaller model? These questions are tractable when the pipeline is explicit and much harder when it's implicit.

⚠️ Common Mistake — Mistake 1: Treating the pipeline as an optimization to add later, after the monolithic prompt stops working. The problem is that by the time the monolithic prompt clearly breaks down, the surrounding code has often been built around its input/output shape — making refactoring expensive. Designing for decomposition from the start is almost always worth the small upfront investment.

The sections that follow will build each of these four patterns in detail, with concrete code examples and worked cases. By the end of this lesson, you'll have both the vocabulary to describe multi-step agent designs and the practical skills to build them. Let's start with the foundation: sequential chains and the mechanics of passing structured state between steps.

Sequential Chains and Routers: Passing State Between Steps

A single LLM call is rarely a dead end — it's usually the first domino in a longer sequence. The challenge isn't making the first call work; it's making the second call work correctly given the first call's output, and the third given the second, and so on. Each handoff is an opportunity for silent failure. A misextracted field, an improperly formatted intermediate result, or an ambiguous classification can corrupt everything downstream while the pipeline continues to run without error. This section is about wiring those handoffs correctly: defining shared state, validating at boundaries, and routing to the right branch with confidence.

The Shared Context Object: Your Pipeline's Backbone

The most common mistake in naive prompt-chaining implementations is threading raw strings from one LLM call directly into the prompt of the next. It works for demos. It fails at scale. A raw string carries no schema, no validation surface, and no checkpointing anchor.

The better approach is to define a shared context object — a single data structure that accumulates results as the pipeline progresses. Every step reads from this object and writes back to it. Steps don't talk to each other directly; they talk through the context.

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class PipelineContext:
    # --- Inputs ---
    raw_user_query: str

    # --- Step 1 outputs ---
    intent: Optional[str] = None          # e.g., "research", "summarize", "compare"
    extracted_entities: list[str] = field(default_factory=list)

    # --- Step 2 outputs (branch-dependent) ---
    retrieved_sources: list[str] = field(default_factory=list)

    # --- Step 3 outputs ---
    final_response: Optional[str] = None

    # --- Pipeline metadata ---
    completed_steps: list[str] = field(default_factory=list)
    errors: list[str] = field(default_factory=list)

This PipelineContext dataclass does several things worth unpacking. First, it separates inputs from outputs, and early-step outputs from later-step outputs — making the data flow legible at a glance. Second, Optional fields make explicit which data exists at which point in the pipeline. If intent is None when a downstream step tries to use it, that's a signal the step ran out of order or an earlier step silently failed. Third, the completed_steps list enables checkpointing: if the pipeline crashes mid-run, you can reload the context from disk and resume from the last completed step without re-running expensive earlier LLM calls.

💡 Mental Model: Think of the context object as a baton in a relay race — but one where the baton itself holds all the information accumulated so far. Runners (steps) don't memorize what previous runners told them; they read it off the baton.

A pipeline step then becomes a function with a clear signature:

def step_extract_intent(ctx: PipelineContext, llm_client) -> PipelineContext:
    """
    Classifies the user's raw query into a named intent
    and extracts key entities. Writes results back to ctx.
    """
    prompt = f"""
    Analyze the following user query and return a JSON object with:
    - "intent": one of ["research", "summarize", "compare", "unknown"]
    - "entities": a list of key nouns or named topics mentioned

    Query: {ctx.raw_user_query}

    Return ONLY valid JSON. No explanation.
    """

    raw_output = llm_client.complete(prompt)

    # --- Validate before writing to context ---
    import json
    try:
        parsed = json.loads(raw_output)
        assert isinstance(parsed.get("intent"), str), "intent must be a string"
        assert isinstance(parsed.get("entities"), list), "entities must be a list"
    except (json.JSONDecodeError, AssertionError) as e:
        # Record the error but don't silently continue
        ctx.errors.append(f"step_extract_intent: validation failed — {e}")
        ctx.intent = "unknown"  # safe fallback
        ctx.extracted_entities = []
    else:
        ctx.intent = parsed["intent"]
        ctx.extracted_entities = parsed["entities"]
        ctx.completed_steps.append("extract_intent")

    return ctx

Notice that this function takes the context, does exactly one thing, and returns the context. It doesn't know about the next step. It doesn't call it. The orchestrator decides what runs next.

Validating at Step Boundaries

The most dangerous moment in a sequential pipeline isn't a crash — it's a silent corruption. A malformed extraction in step 2 that the pipeline treats as successful will be silently embedded into every step that follows. By the time you notice the final output is wrong, the error may have happened four steps back.

🎯 Key Principle: Validate every intermediate output before writing it to the shared context. The step boundary is your last chance to catch an error cheaply, before it propagates.

What does validation look like in practice? It has three levels:

🔧 Structural validation — is the output the right shape? JSON where JSON was expected, a string where a string was expected, a list with at least one item.

🔧 Semantic validation — does the value make sense? An intent field should be one of your known intent names, not an arbitrary string the model hallucinated.

🔧 Dependency validation — does a downstream step's required input exist? Before step 3 runs, confirm that step 2 actually populated retrieved_sources.

⚠️ Common Mistake — Mistake 1: Trusting that json.loads() is sufficient validation. It tells you the string is valid JSON; it says nothing about whether the keys are correct, the values are within expected ranges, or required fields are present. Always check the shape of the parsed object explicitly, or use a schema validation library.

A lightweight but effective pattern is a validation function per step output:

def validate_intent_output(parsed: dict) -> tuple[bool, str]:
    """
    Returns (is_valid, error_message). Error message is empty string if valid.
    """
    valid_intents = {"research", "summarize", "compare", "unknown"}

    if "intent" not in parsed:
        return False, "Missing required field: intent"
    if parsed["intent"] not in valid_intents:
        return False, f"Unknown intent value: {parsed['intent']!r}"
    if "entities" not in parsed:
        return False, "Missing required field: entities"
    if not isinstance(parsed["entities"], list):
        return False, "entities must be a list"

    return True, ""


## Usage inside the step function:
is_valid, error_msg = validate_intent_output(parsed)
if not is_valid:
    ctx.errors.append(f"step_extract_intent: {error_msg}")
    ctx.intent = "unknown"
else:
    ctx.intent = parsed["intent"]
    ctx.extracted_entities = parsed["entities"]
    ctx.completed_steps.append("extract_intent")

This pattern keeps validation logic separate from prompt logic, making it testable in isolation. You can write unit tests for validate_intent_output without ever calling an LLM — which is exactly the property you want in a production system.

The Router Pattern

Once you have a robust sequential chain, the next structural primitive is the router: a step whose job is to classify an input and select which branch of the pipeline to execute next.

Here's the canonical shape of a prompt-chaining router:

Raw Input
    │
    ▼
┌─────────────────┐
│  Classification  │  ← single LLM call
│     Step         │    returns a branch name
└────────┬────────┘
         │
    branch name
         │
   ┌─────┴──────────────────┐
   │                        │
   ▼                        ▼
┌──────────┐          ┌──────────┐
│ Branch A │          │ Branch B │  ... (N branches)
│ Handler  │          │ Handler  │
└────┬─────┘          └────┬─────┘
     │                     │
     └──────────┬──────────┘
                │
                ▼
          Shared Context
          (updated by
           whichever branch ran)

The classification step is a lightweight LLM call — often just a few tokens of output — that maps input to a named branch. The dispatch table is a plain Python dictionary that maps branch names to handler functions.

from typing import Callable

## --- Branch handlers (each is a step function with the same signature) ---

def handle_research(ctx: PipelineContext, llm_client) -> PipelineContext:
    # Retrieves sources, runs deeper analysis, etc.
    ctx.retrieved_sources = ["source_a", "source_b"]  # simplified
    ctx.completed_steps.append("branch_research")
    return ctx

def handle_summarize(ctx: PipelineContext, llm_client) -> PipelineContext:
    # Summarizes whatever is already in the context
    ctx.completed_steps.append("branch_summarize")
    return ctx

def handle_compare(ctx: PipelineContext, llm_client) -> PipelineContext:
    # Runs a side-by-side comparison step
    ctx.completed_steps.append("branch_compare")
    return ctx

def handle_unknown(ctx: PipelineContext, llm_client) -> PipelineContext:
    # Fallback: ask the user to clarify, or return a safe default
    ctx.errors.append("Router fell back to unknown handler")
    ctx.completed_steps.append("branch_unknown")
    return ctx

## --- The dispatch table ---
ROUTER_DISPATCH: dict[str, Callable] = {
    "research":  handle_research,
    "summarize": handle_summarize,
    "compare":   handle_compare,
    "unknown":   handle_unknown,   # explicit fallback branch
}

## --- The router step itself ---
def step_router(ctx: PipelineContext, llm_client) -> PipelineContext:
    """
    Uses the intent already extracted in ctx to select and execute
    the appropriate branch handler.
    """
    intent = ctx.intent or "unknown"

    # Look up the handler; fall back to 'unknown' if intent not in table
    handler = ROUTER_DISPATCH.get(intent, ROUTER_DISPATCH["unknown"])

    if intent not in ROUTER_DISPATCH:
        ctx.errors.append(
            f"step_router: unrecognized intent {intent!r}, using fallback"
        )

    # Execute the selected branch
    ctx = handler(ctx, llm_client)
    ctx.completed_steps.append("router")
    return ctx

Several design choices here are worth naming explicitly. The dispatch table is a plain dictionary — not a class hierarchy, not a registry object. This makes it trivially inspectable and testable. Every handler has the same signature, which means you can add a new branch by writing one function and adding one dictionary entry. The router step itself doesn't know what the handlers do; it just looks up and calls.

💡 Pro Tip: Keep the classification call and the routing logic in separate functions. The classification call talks to the LLM and produces an intent string. The routing step reads that string and dispatches. If you bundle them together, you lose the ability to unit-test the dispatch logic without mocking an LLM.

Handling Router Uncertainty

LLMs don't always return what you expect. Even with a carefully constrained prompt asking for exactly one of four values, a model will occasionally return "research_query" instead of "research", or add punctuation, or return a JSON object when you expected a plain string. Your router needs a principled response to this uncertainty.

There are three options, and the right choice depends on the cost of being wrong:

Option 1: Fallback branch. The dispatch table includes an "unknown" key that handles anything unrecognized. The pipeline continues, logs the uncertainty, and produces a graceful (if conservative) result. Use this when a partial result is better than no result — for example, in a customer-facing application where failing silently is worse than returning a hedged response.

Option 2: Exception. Raise immediately. Log the unrecognized branch name and halt the pipeline. Use this when continuing with uncertain routing would be expensive or dangerous — for example, when branch B triggers an external API call that costs money or has side effects.

Option 3: Re-prompt. Ask the model again, with a stricter prompt that shows the invalid output and explicitly lists the valid choices. This adds latency and cost, but can recover from transient model errors. Limit retries (two or three attempts at most) and fall through to a fallback or exception if the model still can't comply.

def step_classify_with_retry(
    ctx: PipelineContext,
    llm_client,
    max_retries: int = 2
) -> PipelineContext:
    """
    Classification step with re-prompt fallback for invalid outputs.
    """
    valid_intents = list(ROUTER_DISPATCH.keys())
    last_output = None

    for attempt in range(max_retries + 1):
        if attempt == 0:
            prompt = (
                f"Classify this query into exactly one of {valid_intents}.\n"
                f"Return only the intent word, nothing else.\n\n"
                f"Query: {ctx.raw_user_query}"
            )
        else:
            # Re-prompt: show the model what went wrong
            prompt = (
                f"Your previous response {last_output!r} is not one of {valid_intents}.\n"
                f"Return ONLY one of those exact strings and nothing else."
            )

        raw_output = llm_client.complete(prompt).strip().lower()
        last_output = raw_output

        if raw_output in ROUTER_DISPATCH:
            ctx.intent = raw_output
            ctx.completed_steps.append("classify_with_retry")
            return ctx

    # All retries exhausted: fall back to 'unknown'
    ctx.errors.append(
        f"Classification failed after {max_retries + 1} attempts; "
        f"last output: {last_output!r}"
    )
    ctx.intent = "unknown"
    ctx.completed_steps.append("classify_with_retry")
    return ctx

⚠️ Common Mistake — Mistake 2: Treating a re-prompt loop as a reliable fix. Re-prompting helps with transient formatting errors, but if the model is consistently returning an unrecognized value, that's a signal that your intent taxonomy doesn't match what the model understands about the input. Adding more retries papers over a design problem. The right fix is to revise the prompt, broaden the dispatch table, or reclassify the inputs.

🤔 Did you know? The uncertainty a router encounters at runtime is often a signal about edge cases in the original problem definition. If the model keeps returning "analysis" when you only defined "research" and "summarize", users may be sending queries that genuinely don't fit either bucket. The router's failure log is diagnostic data about your taxonomy.

Keeping Steps Stateless for Resumability

A pipeline step is stateless when it reads only from the context object and writes only to the context object — it doesn't reach into external state (global variables, module-level caches, open file handles) that would need to be reconstructed if the pipeline crashed and resumed.

Statelessness isn't just about cleanliness. It's what makes checkpointing practical. If step 4 of a six-step pipeline crashes due to a network timeout, you want to be able to reload the context from disk (or a key-value store) and resume from step 4 without re-running steps 1 through 3 — each of which may have cost tokens and time.

Pipeline Execution with Checkpointing

Step 1 → [save ctx to disk] → Step 2 → [save ctx] → Step 3 → CRASH
                                                              │
                                              Reload ctx from last checkpoint
                                                              │
                                                              ▼
                                                          Step 3 (retry)
                                                              │
                                                         [save ctx]
                                                              │
                                                              ▼
                                                          Step 4 ...

For this to work, every step function must be idempotent: running it twice with the same starting context should produce the same result. In practice, LLM calls are not perfectly idempotent — the model may return slightly different text on a second call. The discipline of writing to a typed context object with explicit field assignments means the structure of the output is idempotent even if the exact text varies slightly.

A practical checkpointing pattern uses completed_steps to guard re-entry:

import json
import pathlib
from dataclasses import asdict

CHECKPOINT_PATH = pathlib.Path("/tmp/pipeline_ctx.json")

def save_checkpoint(ctx: PipelineContext) -> None:
    CHECKPOINT_PATH.write_text(json.dumps(asdict(ctx)))

def load_checkpoint() -> Optional[PipelineContext]:
    if not CHECKPOINT_PATH.exists():
        return None
    data = json.loads(CHECKPOINT_PATH.read_text())
    return PipelineContext(**data)

def run_pipeline(raw_query: str, llm_client) -> PipelineContext:
    # Resume from checkpoint if one exists, otherwise start fresh
    ctx = load_checkpoint() or PipelineContext(raw_user_query=raw_query)

    # Each step checks whether it already completed before running
    if "extract_intent" not in ctx.completed_steps:
        ctx = step_extract_intent(ctx, llm_client)
        save_checkpoint(ctx)

    if "router" not in ctx.completed_steps:
        ctx = step_router(ctx, llm_client)
        save_checkpoint(ctx)

    # ... additional steps follow the same pattern

    return ctx

This is a simplified pattern — in production you'd use a more robust store than a local file, and you'd handle concurrent runs carefully. But the structure captures the essential discipline: always write before moving on, always check before re-running.

🎯 Key Principle: The unit of resumability is the step, not the pipeline run. Each step should be independently restartable without depending on in-memory state from a previous step.

Putting the Pieces Together

A sequential chain with routing now has a clear shape:

raw_user_query
      │
      ▼
┌─────────────────┐
│ Extract Intent   │  Step 1: LLM call → writes intent + entities to ctx
│ + Validate       │  Validation at boundary before ctx is updated
└────────┬────────┘
         │  ctx
      [checkpoint]
         │
         ▼
┌─────────────────┐
│ Router Step      │  Reads ctx.intent → dispatches to handler
└────────┬────────┘
         │
    ┌────┴─────────────────────┐
    │          │               │
    ▼          ▼               ▼
[research] [summarize]    [compare]    ... (fallback: unknown)
    │          │               │
    └────┬─────┴───────────────┘
         │  ctx updated by branch
      [checkpoint]
         │
         ▼
┌─────────────────┐
│ Final Response   │  Step N: uses ctx to generate output
│ Generation       │
└─────────────────┘

Every arrow in this diagram is a context object. Every box is a stateless function. Every transition has a checkpoint. Every LLM output is validated before it's written.

Wrong thinking: "I'll just format the output of each step into the next prompt's input string — it's simpler."

Correct thinking: "I'll write each step's output into a typed context object, validate it, checkpoint it, and let the next step read from that object. The indirection pays for itself the first time I need to debug a failure mid-pipeline."

The indirection of a shared context object feels unnecessary for a two-step pipeline. It becomes indispensable at step five when something fails and you need to know exactly what each prior step produced, replay from step three without re-running earlier expensive calls, and add a new branch to the router without touching any other step's code.

📋 Quick Reference Card:

🔧 Pattern 🎯 Purpose ⚠️ Key Risk
🗂️ Shared context object Accumulates structured state across steps Skipping validation before writes
🔒 Step boundary validation Catches malformed outputs before propagation Trusting raw LLM strings
🔀 Dispatch table router Maps intent → handler with explicit fallback No fallback for unknown branch names
🔁 Re-prompt with retry Recovers from transient classification errors Masking a taxonomy design problem
💾 Checkpointing Enables resume without re-running earlier steps Non-idempotent steps that produce inconsistent re-runs

The patterns in this section — shared context, boundary validation, dispatch table routing, and stateless steps — form a stable foundation that the rest of this lesson builds on. The fan-out/fan-in patterns in the next section depend on the same context object growing richer rather than more fragile. The worked example will use exactly this structure. And the pitfalls section will show what happens, concretely, when these disciplines are skipped.

Ensemble Patterns: Aggregating Multiple Independent Outputs

When a single LLM call produces an answer you cannot fully trust, the instinct is to run it again. Ensemble patterns formalize that instinct into a repeatable architecture: rather than hoping one call gets it right, you generate several independent outputs and then combine them through a deliberate aggregation step. The logic is borrowed from classical machine learning, where ensembles of weaker models routinely outperform any single model on well-defined tasks — but the translation to LLM pipelines comes with its own cost structure, failure modes, and design constraints that are worth understanding precisely before reaching for this pattern.

What an Ensemble Pipeline Looks Like

At the structural level, an ensemble pipeline has two stages. The first stage is a fan-out: the same prompt (or a set of slightly varied prompts) is sent to N independent calls, which run in parallel and produce N independent candidate outputs. The second stage is an aggregation: a separate mechanism combines those candidates into a single final answer. Neither stage knows about the other's internals — the fan-out calls do not communicate with each other, and the aggregation step receives all candidates at once.

Prompt ──┬──► Call A ──► output_A ──┐
         ├──► Call B ──► output_B ──┼──► Aggregator ──► Final Answer
         └──► Call C ──► output_C ──┘
              (parallel)               (sequential)

This diagram captures the essential shape: fan-out runs in parallel to contain latency, and the aggregation step runs sequentially afterward. The aggregator is itself an LLM call (or a deterministic function, depending on the strategy), which means it adds one additional round-trip to the pipeline.

The Three Aggregation Strategies

There is no single correct way to combine N outputs. Three strategies cover most practical use cases, and they differ in how much they trust determinism versus language understanding.

Strategy 1: Majority Vote

Majority vote treats the N outputs as ballots and selects whichever answer appears most frequently. This is purely deterministic — no additional LLM call is required. It works best when answers are discrete and comparable: multiple-choice selection, classification labels, short factual answers, numeric results, or structured outputs where two candidates can be tested for exact or near-exact equality.

from collections import Counter

def majority_vote(candidates: list[str]) -> str:
    """
    Select the most common answer from a list of candidate strings.
    Ties are broken by order of first appearance.
    """
    if not candidates:
        raise ValueError("candidates list must not be empty")
    
    # Normalize whitespace before counting
    normalized = [c.strip().lower() for c in candidates]
    count = Counter(normalized)
    most_common_value, _ = count.most_common(1)[0]
    
    # Return the original (un-normalized) version of the winner
    for original, norm in zip(candidates, normalized):
        if norm == most_common_value:
            return original

This function normalizes whitespace and case before counting, then returns the original formatting of the winning answer. For tasks like classifying a customer support ticket into one of five categories, majority vote across three calls is cheap, fast, and surprisingly effective — if the model is genuinely uncertain, you get a distribution of votes that tells you something; if it is confident, all three agree and you pay three times the token cost for high certainty.

⚠️ Common Mistake — Mistake 1: Applying majority vote to long-form outputs. Majority vote assumes candidates can be meaningfully compared for equality. A 500-word summary generated three times will never match character-for-character, so every call gets one vote and the winner is effectively random. Reserve this strategy for short, discrete answers.

Strategy 2: Judge Call

A judge call is a separate LLM prompt that receives all N candidates and selects the best one, returning either the chosen answer verbatim or an index pointing to it. Unlike majority vote, the judge can reason about quality, correctness, clarity, or adherence to instructions — making it useful for tasks where "best" is not the same as "most common."

import json

def build_judge_prompt(question: str, candidates: list[str]) -> str:
    """
    Build a judge prompt that receives all candidates simultaneously
    to avoid positional bias from sequential presentation.
    """
    candidate_block = "\n\n".join(
        f"[Candidate {i+1}]\n{text}" for i, text in enumerate(candidates)
    )
    return f"""You are an impartial judge evaluating candidate answers to the question below.

Question: {question}

Candidates (evaluate all before deciding):
{candidate_block}

Respond with a JSON object in this exact format:
{{"selected": <1-based index of best candidate>, "reason": "<one sentence>"}}

Do not prefer any candidate simply because of its position. Base your decision
only on accuracy, completeness, and clarity."""


def parse_judge_response(response_text: str, candidates: list[str]) -> str:
    """
    Parse the judge's JSON response and return the selected candidate.
    """
    data = json.loads(response_text)
    selected_index = data["selected"] - 1  # convert to 0-based
    if not 0 <= selected_index < len(candidates):
        raise ValueError(f"Judge returned out-of-range index: {data['selected']}")
    return candidates[selected_index]

The prompt deliberately tells the judge to evaluate all candidates before deciding — a small instruction that addresses a real failure mode discussed below. The structured JSON response also makes the downstream parsing deterministic rather than relying on the model to write "I choose Candidate 2" in freeform text.

Strategy 3: Merge (Synthesis) Call

A merge call does not select a winner — it synthesizes all N candidates into a single output that is better than any individual input. This is the most expensive aggregation strategy, because the merge prompt must read all N candidates in full and then generate a new response, but it is also the most powerful for tasks where each candidate contributes something the others miss.

Imagine asking three calls to identify security vulnerabilities in a code snippet. Call A flags an SQL injection risk. Call B flags an exposed API key. Call C flags both but misidentifies a false positive. A merge call reading all three can produce a deduplicated, correctly reasoned final report — something neither majority vote nor a judge call could achieve, because the goal is not to pick the best individual report but to combine the partial insights.

💡 Real-World Example: Code review, multi-angle research synthesis, and technical documentation generation are tasks where the merge strategy earns its cost. Sequential summaries ("here is the best one") leave value in the other candidates on the table; synthesis captures it.

When Ensembles Help — and When They Do Not

🎯 Key Principle: Ensembles reduce variance at the cost of increased token spend and latency. They are worth that cost only when variance is the dominant failure mode.

For tasks with a clear, verifiable correct answer — arithmetic reasoning, classification, factual retrieval, structured data extraction — a single LLM call can land anywhere in a distribution of possible responses. The model may reason correctly six times out of ten. Running N=3 independent calls and taking the majority dramatically reduces the chance that the wrong answer wins, because the model would have to err in the same way on multiple independent draws. Ensemble patterns genuinely improve reliability here.

For open-ended creative tasks — writing a poem, brainstorming product names, generating marketing copy — there is no correct answer against which to measure. Every call produces a valid but different output, and "majority vote" is meaningless. A judge call adds cost and introduces the judge's own biases. A merge call may homogenize creative outputs into something blandly average. On these tasks, ensembles often add noise rather than signal.

Wrong thinking: "More calls always means better output — I should use ensembles everywhere."

Correct thinking: "Ensembles reduce output variance. I should use them only on tasks where variance is causing failures, and where the cost of those failures exceeds the cost of running N calls."

A practical heuristic: if you can write a test that objectively scores the output (the extracted date is either correct or not, the classification label either matches the ground truth or not), ensembles are likely to help. If the output quality is primarily a matter of taste or subjective judgment, ensembles are unlikely to justify their cost.

Cost and Latency Arithmetic

The economics of ensemble patterns are straightforward enough to reason about explicitly, which makes the break-even point concrete rather than intuitive.

For a baseline single-call pipeline with prompt tokens P, completion tokens C, and per-token cost r:

Baseline cost = (P + C) × r
Baseline latency = one round-trip

For an ensemble with N=3 parallel calls followed by one aggregation call of cost A:

Ensemble cost = 3 × (P + C) × r + A × r
Ensemble latency = one parallel round-trip + one aggregation round-trip

Because the three calls run in parallel, latency is roughly the time of the slowest single call plus the aggregation call — not three sequential round-trips. This means ensemble patterns are relatively latency-friendly when N calls are parallelized, but they are never token-cost-friendly: you are always paying at minimum N times the input and output tokens.

💡 Mental Model: Think of the ensemble as buying an insurance policy. The premium is the extra token cost. The payout is reduced probability of a wrong answer reaching production. The policy is worth buying when the cost of a wrong answer (user frustration, downstream pipeline failure, incorrect data written to a database) exceeds the premium.

def ensemble_cost_analysis(
    prompt_tokens: int,
    completion_tokens: int,
    cost_per_token: float,
    n_calls: int = 3,
    aggregation_tokens: int = None,
) -> dict:
    """
    Compute baseline vs. ensemble token costs to support a go/no-go decision.
    All token counts are estimates; use measured values from your actual prompts.
    """
    if aggregation_tokens is None:
        # Rough estimate: aggregator reads all N completions plus its own output
        aggregation_tokens = (n_calls * completion_tokens) + completion_tokens

    baseline_tokens = prompt_tokens + completion_tokens
    ensemble_tokens = (n_calls * (prompt_tokens + completion_tokens)) + aggregation_tokens

    baseline_cost = baseline_tokens * cost_per_token
    ensemble_cost = ensemble_tokens * cost_per_token
    multiplier = ensemble_cost / baseline_cost

    return {
        "baseline_tokens": baseline_tokens,
        "ensemble_tokens": ensemble_tokens,
        "baseline_cost_usd": round(baseline_cost, 6),
        "ensemble_cost_usd": round(ensemble_cost, 6),
        "cost_multiplier": round(multiplier, 2),
    }

## Example: a moderately sized classification prompt
result = ensemble_cost_analysis(
    prompt_tokens=800,
    completion_tokens=50,
    cost_per_token=0.000002,  # illustrative; use your model's actual rate
    n_calls=3,
)
print(result)
## {'baseline_tokens': 850, 'ensemble_tokens': 2700, 'baseline_cost_usd': 0.0017,
##  'ensemble_cost_usd': 0.0054, 'cost_multiplier': 3.18}

Running this analysis before building an ensemble pipeline makes the decision concrete. A 3.18x cost multiplier on a high-volume classification task may be prohibitive. On a low-volume, high-stakes document review task, it may be trivially affordable. Knowing the number prevents the ensemble from becoming a default pattern applied indiscriminately.

⚠️ Common Mistake — Mistake 2: Assuming N=3 is always the right fan-out width. N=3 is a reasonable default for majority vote (it guarantees a majority exists), but for judge and merge strategies, increasing N beyond 3 yields diminishing returns while the aggregation prompt grows linearly in length. If the aggregation call's context window becomes a constraint, you may hit quality degradation before you hit quality improvement.

Designing the Judge or Merge Prompt to Avoid Positional Bias

The most commonly overlooked failure mode in ensemble design is positional bias: the tendency of LLMs to favor whichever candidate appears first (or sometimes last) in the prompt, independent of quality. This is not a minor edge case — it is a well-observed behavioral pattern that will systematically distort your judge or merge results if you do not design against it.

The core fix is architectural: always present all candidates simultaneously, not sequentially. If you show the judge Candidate A first and ask "is this good?" before showing Candidate B, you have introduced an asymmetry that has nothing to do with the candidates' actual quality. The judge prompt shown earlier in this section demonstrates the correct structure — all candidates appear in a single prompt, labeled neutrally, before any evaluation instruction.

Beyond simultaneous presentation, three additional techniques help:

🔧 Randomize candidate order across runs. If you run the same ensemble twice and the "best" answer changes based on which candidate appeared first in the prompt, you have a positional bias problem. Shuffling the order before constructing the judge prompt and checking for consistency is a cheap diagnostic.

🔧 Use neutral, consistent labels. Labels like "Candidate 1, Candidate 2, Candidate 3" are better than labels that carry implicit hierarchy ("Primary answer, Alternative, Backup"). The latter signals to the model which answer to prefer before it has read a word of the content.

🔧 Instruct the judge explicitly. The instruction "Do not prefer any candidate simply because of its position" is not guaranteed to eliminate bias, but it reduces it measurably compared to prompts that say nothing about evaluation order. Pair this instruction with a request for a brief reason — requiring the judge to articulate a rationale forces it to engage with content rather than defaulting to position.

🤔 Did you know? Positional bias in LLM-as-judge settings is related to the same attention-allocation dynamics that cause models to weight information near the beginning and end of a long context more heavily than information in the middle — sometimes called the "lost in the middle" effect in the research literature. Designing prompts that counteract this tendency is part of careful ensemble engineering.

For merge calls, the simultaneous presentation requirement applies equally, but the additional challenge is ensuring the merge prompt does not simply echo whichever candidate it encounters first. One useful technique is to ask the merge call to explicitly list the unique contributions of each candidate before synthesizing — this forces it to read all inputs before generating:

You are synthesizing three independent analyses into one final report.

Before writing the synthesis, briefly note what each candidate contributes
that the others do not. Then write a unified final answer that incorporates
all distinct valid points.

[Candidate 1]
{output_a}

[Candidate 2]
{output_b}

[Candidate 3]
{output_c}

The "note contributions first" instruction acts as a forcing function — the model cannot begin synthesis until it has engaged with all three inputs, which reduces the tendency to front-weight the first candidate.

⚠️ Common Mistake — Mistake 3: Presenting candidates one at a time in a multi-turn conversation. Some implementations show the judge Candidate 1 in message turn 1, ask for a score, then show Candidate 2 in turn 2, and so on — finally asking which scored highest. This approach compounds positional bias with recency effects in the conversation context, and it also means the judge is comparing a fresh reading of the current candidate against a memory of earlier ones. Present all candidates in a single prompt.

Putting It Together: When to Reach for Ensemble Patterns

Ensemble patterns occupy a specific niche in agentic pipeline design. They are not a universal quality improvement — they are a targeted tool for reducing variance on tasks where the cost of that variance is high and the answer space is structured enough for meaningful aggregation.

📋 Quick Reference Card: Ensemble Strategy Selection

🎯 Task Type 📊 Answer Space 🔧 Best Strategy 💰 Cost Impact
🔒 Classification / labeling Discrete, finite Majority vote ~3x tokens, no extra call
📚 Factual Q&A Short, verifiable Majority vote or judge ~3–4x tokens
🔧 Code generation Evaluable by tests Judge call ~3–4x tokens
🧠 Research synthesis Multi-faceted Merge call ~4–6x tokens
🎯 Creative writing Open-ended Avoid ensembles Not worth cost

The break-even decision reduces to three questions: Is the task's answer space structured enough for meaningful comparison? Is variance in single-call output actually causing failures? Does the cost of those failures exceed the ensemble's token premium? If all three answers are yes, ensembles are a strong candidate. If any answer is no, consider whether better prompting or a more targeted retrieval step would address the root problem at lower cost.

🧠 Mnemonic — VAC: When evaluating an ensemble, check Variance (is it high?), Answer space (is it structured?), and Cost (does the break-even work?). All three must clear before the pattern earns its place in your pipeline.

Ensemble patterns work best when combined with the fan-out/fan-in infrastructure introduced conceptually earlier in this lesson. The next section grounds all of these patterns — sequential chains, routers, and ensembles — in a single worked example where you can see how they compose in practice.

Practical Pipeline Implementation: A Worked Example

Theory becomes durable only when you can trace it through running code. The sections before this one introduced sequential chaining, routers, and ensemble patterns as abstract structures. Now we build a concrete pipeline — a research-and-draft agent — that classifies an incoming request, routes it through an appropriate fact-gathering branch, and produces a finished draft. Every design decision made here connects directly to the patterns already covered, so you can see exactly where the seams are and how the parts fit together.

The example is deliberately scoped to be readable in one sitting. It omits OAuth, rate-limit back-off, and persistent storage — not because those details don't matter in production, but because they would obscure the structural lesson. Where those simplifications matter, the text says so.

The Pipeline We're Building

The agent accepts a user query and produces a polished short-form response. The flow has four steps:

  1. Classify — a router decides whether the query needs factual lookup or creative synthesis
  2. Research — the appropriate branch gathers raw material
  3. Draft — a writing step turns raw material into a response
  4. Validate — a lightweight check confirms the draft is on-topic
User Query
    │
    ▼
┌─────────────┐
│  CLASSIFY   │  ← router step
└──────┬──────┘
       │
  ┌────┴─────┐
  │          │
  ▼          ▼
┌──────┐  ┌──────────┐
│FACT  │  │CREATIVE  │  ← research branches
│LOOKUP│  │SYNTHESIS │
└──┬───┘  └────┬─────┘
   └─────┬─────┘
         ▼
   ┌──────────┐
   │  DRAFT   │  ← drafting step
   └────┬─────┘
        ▼
   ┌──────────┐
   │ VALIDATE │  ← quality gate
   └────┬─────┘
        ▼
   Final Response

Each box in the diagram is a step function — a Python callable that accepts a shared context dictionary and returns an updated version of it. This uniform signature is the load-bearing design decision, so let's look at it first.

Defining the Context Type and Step Contract

The most important structural choice in any pipeline is deciding what flows between steps. Using a plain dict is tempting but fragile — a typo in a key name produces a silent None rather than an error. Using a typed dataclass gives you IDE autocompletion and catches mistakes at the point where they're introduced rather than three steps later when something unexpectedly missing causes a crash.

from __future__ import annotations
import time
import json
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Optional

logger = logging.getLogger("pipeline")


@dataclass
class PipelineContext:
    """Shared state that flows through every step in the pipeline."""
    query: str                              # original user query (never mutated)
    branch: Optional[str] = None           # set by the router: "fact" or "creative"
    research_notes: Optional[str] = None   # raw material gathered by the research step
    draft: Optional[str] = None            # output of the drafting step
    validated: bool = False                # set to True by the validation step
    error: Optional[str] = None            # populated if a step fails unrecoverably
    metadata: dict[str, Any] = field(default_factory=dict)  # per-step telemetry


## A step is any function with this signature.
StepFn = Callable[[PipelineContext], PipelineContext]

With PipelineContext defined, every step in the pipeline has the same contract: take a context, return a context. This uniformity pays off in three places. First, error handling can live in a single wrapper rather than being duplicated across every step. Second, you can slot steps in or out of the pipeline list without touching the other steps. Third, logging — which we'll add shortly — can be written once and applied everywhere.

🎯 Key Principle: A uniform step signature — one type in, same type out — is what makes pipelines composable. The moment different steps accept different argument shapes, you need glue code at every junction, and that glue code becomes the thing that fails.

Implementing the Router Step

The router reads the query and decides which branch to activate. It writes its decision into context.branch and returns the context unchanged in every other respect. The downstream research stage reads context.branch and dispatches accordingly.

import openai  # assumes openai package installed; adapt to your preferred client

client = openai.OpenAI()  # reads OPENAI_API_KEY from environment


def classify_step(ctx: PipelineContext) -> PipelineContext:
    """
    Router step. Classifies the query as 'fact' (needs verifiable information)
    or 'creative' (needs synthesis, opinion, or imaginative output).
    Writes the decision to ctx.branch.
    """
    system_prompt = (
        "You are a query classifier. "
        "Respond with exactly one word: 'fact' if the query asks for verifiable "
        "information, or 'creative' if it asks for synthesis, opinion, or creative writing. "
        "No other output."
    )
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": ctx.query},
        ],
        temperature=0,      # deterministic — classification should not be stochastic
        max_tokens=5,
    )
    raw = response.choices[0].message.content.strip().lower()
    ctx.branch = "fact" if "fact" in raw else "creative"
    return ctx


def fact_lookup_step(ctx: PipelineContext) -> PipelineContext:
    """
    Research branch for factual queries. In a real system this would call
    a retrieval tool or search API; here it uses the LLM's internal knowledge
    and explicitly asks for bullet-point notes rather than a finished answer.
    """
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a research assistant. Produce concise bullet-point notes "
                    "covering the key facts relevant to the query. Do NOT write a final answer. "
                    "Aim for 5-8 bullets."
                ),
            },
            {"role": "user", "content": ctx.query},
        ],
        temperature=0.2,
        max_tokens=300,
    )
    ctx.research_notes = response.choices[0].message.content.strip()
    return ctx


def creative_synthesis_step(ctx: PipelineContext) -> PipelineContext:
    """
    Research branch for creative or synthetic queries. Generates a set of
    raw ideas and angles before the drafting step turns them into prose.
    """
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a creative strategist. Brainstorm 4-6 distinct angles, "
                    "metaphors, or framings for the query. Output as a numbered list. "
                    "Do NOT write a polished answer yet."
                ),
            },
            {"role": "user", "content": ctx.query},
        ],
        temperature=0.8,   # higher temperature is appropriate for generative ideation
        max_tokens=400,
    )
    ctx.research_notes = response.choices[0].message.content.strip()
    return ctx


def router_step(ctx: PipelineContext) -> PipelineContext:
    """
    Dispatches to the correct research branch based on ctx.branch.
    This function is itself a step, so it fits in the pipeline list like any other.
    """
    if ctx.branch == "fact":
        return fact_lookup_step(ctx)
    return creative_synthesis_step(ctx)


def draft_step(ctx: PipelineContext) -> PipelineContext:
    """Turns research notes into a finished short-form response."""
    response = client.chat.completions.create(
        model="gpt-4o",     # upgrades to a stronger model for the final output
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a skilled writer. Using the research notes provided, "
                    "write a clear, engaging response to the user's query. "
                    "Stay under 200 words. Address the user directly."
                ),
            },
            {
                "role": "user",
                "content": f"Query: {ctx.query}\n\nResearch notes:\n{ctx.research_notes}",
            },
        ],
        temperature=0.5,
        max_tokens=400,
    )
    ctx.draft = response.choices[0].message.content.strip()
    return ctx


def validate_step(ctx: PipelineContext) -> PipelineContext:
    """
    Lightweight relevance check. Sets ctx.validated = True if the draft
    addresses the original query; raises ValueError otherwise (triggering retry).
    """
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a quality checker. Does the draft adequately address the query? "
                    "Reply with exactly one word: 'yes' or 'no'."
                ),
            },
            {
                "role": "user",
                "content": f"Query: {ctx.query}\n\nDraft:\n{ctx.draft}",
            },
        ],
        temperature=0,
        max_tokens=5,
    )
    verdict = response.choices[0].message.content.strip().lower()
    if "yes" in verdict:
        ctx.validated = True
        return ctx
    raise ValueError(f"Validation failed: draft does not address query '{ctx.query}'")

Notice that router_step is itself a step — it accepts and returns a PipelineContext — even though internally it dispatches to one of two sub-steps. This keeps the top-level pipeline list clean: it doesn't need to know about branching logic at all.

⚠️ Common Mistake: Passing the raw query directly to the drafting step and skipping the research step when the router's label says "creative." Creative synthesis still benefits from a structured ideation pass before drafting — skipping it collapses the two-stage process into a single call and loses the quality advantage the pipeline was designed to provide.

Adding a Retry Wrapper

Validation occasionally fails not because the draft is genuinely bad but because the model misunderstood the check, or because the draft was close but missed one detail that a second attempt would fix. A retry wrapper gives each step one additional chance before propagating the exception to the caller.

The wrapper is generic — it wraps any StepFn — which means you write it once and apply it to whichever steps need it.

def with_retry(step_fn: StepFn, max_attempts: int = 2) -> StepFn:
    """
    Returns a new step function that calls step_fn up to max_attempts times.
    On the first failure it logs a warning and tries once more.
    On the second failure it sets ctx.error and re-raises the exception.

    Simplified: this retry logic re-runs the same step unchanged. A more
    robust version would inject error feedback into the context so the
    re-prompt can incorporate what went wrong (covered in the pitfalls section).
    """
    def wrapper(ctx: PipelineContext) -> PipelineContext:
        last_exc: Optional[Exception] = None
        for attempt in range(1, max_attempts + 1):
            try:
                return step_fn(ctx)
            except Exception as exc:
                last_exc = exc
                logger.warning(
                    "Step '%s' failed on attempt %d/%d: %s",
                    step_fn.__name__,
                    attempt,
                    max_attempts,
                    exc,
                )
                if attempt == max_attempts:
                    ctx.error = str(exc)
                    raise
        # unreachable, but satisfies type checkers
        raise RuntimeError("Unreachable") from last_exc

    wrapper.__name__ = step_fn.__name__  # preserve name for logging
    return wrapper

💡 Pro Tip: Keep max_attempts at 2 for validation steps. Three or more retries on a consistently failing step usually indicate a structural problem — a prompt that produces the wrong output shape, or a context field that's empty when the step expects it to be populated. Retrying more than twice delays the error signal without fixing the root cause.

Structured Logging for Every Step

The retry wrapper handles failure recovery. But to debug a failure after the fact — especially one that only manifests in production — you need a record of every step's input, output, token consumption, and elapsed time. This is the structured log wrapper.

The logging wrapper is separate from the retry wrapper deliberately. Mixing them would create a function that does two things, making each harder to test in isolation.

import time


def with_logging(step_fn: StepFn) -> StepFn:
    """
    Wraps a step function with structured logging.
    Records: step name, input snapshot, output snapshot,
    elapsed time in milliseconds, and any exception.

    Note: the 'input snapshot' and 'output snapshot' shown here log the full
    context. In production you would redact PII and truncate large fields
    before writing to your log sink.
    """
    def wrapper(ctx: PipelineContext) -> PipelineContext:
        step_name = step_fn.__name__
        start_ts = time.monotonic()

        # Shallow snapshot of the context before the step runs
        input_snapshot = {
            "query": ctx.query,
            "branch": ctx.branch,
            "has_research_notes": ctx.research_notes is not None,
            "has_draft": ctx.draft is not None,
        }

        try:
            result_ctx = step_fn(ctx)
            elapsed_ms = (time.monotonic() - start_ts) * 1000

            log_record = {
                "step": step_name,
                "status": "ok",
                "elapsed_ms": round(elapsed_ms, 1),
                "input": input_snapshot,
                "output": {
                    "branch": result_ctx.branch,
                    "research_notes_chars": len(result_ctx.research_notes or ""),
                    "draft_chars": len(result_ctx.draft or ""),
                    "validated": result_ctx.validated,
                },
            }
            logger.info(json.dumps(log_record))

            # Persist record to context metadata for end-to-end tracing
            result_ctx.metadata.setdefault("steps", []).append(log_record)
            return result_ctx

        except Exception as exc:
            elapsed_ms = (time.monotonic() - start_ts) * 1000
            log_record = {
                "step": step_name,
                "status": "error",
                "elapsed_ms": round(elapsed_ms, 1),
                "input": input_snapshot,
                "error": str(exc),
            }
            logger.error(json.dumps(log_record))
            result_ctx = ctx  # ctx may be partially mutated; pass it through
            result_ctx.metadata.setdefault("steps", []).append(log_record)
            raise

    wrapper.__name__ = step_fn.__name__
    return wrapper

Storing the log records inside ctx.metadata["steps"] gives you something valuable: at the end of any pipeline run — successful or failed — the context object itself is a complete audit trail. You can pickle the final context, ship it to a debug queue, and replay the failure offline without needing to reconstruct state from separate log files.

🤔 Did you know? The cost of reconstructing a failed pipeline run from incomplete logs grows faster than the pipeline itself — each additional step multiplies the number of ways partial state can be ambiguous. Storing the full telemetry inside the shared context object short-circuits this problem: the artifact that failed carries its own history.

Assembling the Pipeline

With the step functions defined and the two wrappers in place, assembling the pipeline is a matter of listing the steps in order, wrapping each one, and running them in sequence.

def run_pipeline(query: str) -> PipelineContext:
    """
    Builds and runs the research-and-draft pipeline for a single query.
    Returns the final PipelineContext, which contains the draft, validation
    status, and full step-level telemetry in ctx.metadata['steps'].
    """
    ctx = PipelineContext(query=query)

    # Define the ordered list of steps.
    # Each step is wrapped: logging outermost, then retry for steps that can
    # legitimately succeed on a second attempt.
    pipeline: list[StepFn] = [
        with_logging(classify_step),                          # router: classify
        with_logging(router_step),                            # router: dispatch
        with_logging(draft_step),                             # drafting
        with_logging(with_retry(validate_step, max_attempts=2)),  # quality gate
    ]

    for step in pipeline:
        ctx = step(ctx)
        # Short-circuit if a step recorded an unrecoverable error
        if ctx.error:
            logger.error("Pipeline halted at step '%s'", step.__name__)
            break

    return ctx


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    result = run_pipeline("What are the main differences between transformer and RNN architectures?")
    if result.validated:
        print(result.draft)
    else:
        print(f"Pipeline did not produce a validated draft. Error: {result.error}")

The pipeline list makes the execution order explicit and readable — a new team member can scan it and understand the entire flow without reading the step implementations. Adding a new step means appending to the list, not touching any existing step.

💡 Real-World Example: A common pattern in production pipelines is to separate step definition from step execution by storing the pipeline list in a configuration object rather than hardcoding it in run_pipeline. This allows different pipeline configurations (e.g., a fast path that skips the validation step for low-stakes queries) to be selected at runtime without code changes.

Where Fan-Out/Fan-In Would Slot In

Look at the fact_lookup_step in the pipeline above. It makes a single LLM call to gather research notes. For many queries, this is adequate. For complex factual questions — ones where you'd want to consult multiple sources, run several searches in parallel, or generate independent summaries and then reconcile them — a single call is a bottleneck.

This is exactly the scenario where fan-out/fan-in replaces fact_lookup_step. The replacement is surgical: because every step shares the same PipelineContext contract, you can swap fact_lookup_step for a parallel_fact_lookup_step that spawns multiple concurrent calls and merges their outputs into ctx.research_notes before returning. The rest of the pipeline — the draft step, the validate step, the wrappers — sees no change at all.

## Current single-call research step:

  router_step
      │
      ▼
  fact_lookup_step  ──→  draft_step

## Fan-out/fan-in replacement (covered in the next lesson):

  router_step
      │
      ▼
  ┌───────────────────────────────────┐
  │        FAN-OUT                    │
  │  search_A  search_B  search_C     │
  │      └────────┬────────┘          │
  │           FAN-IN                  │
  │       merge_and_rank              │
  └───────────────────────────────────┘
      │
      ▼  (ctx.research_notes populated as before)
  draft_step

The key insight is that fan-out/fan-in is a step-shaped operation: it accepts a PipelineContext and returns one. The concurrency is an internal implementation detail of that step, invisible to the rest of the pipeline. The next lesson covers how to implement that internal concurrency with Python's asyncio or concurrent.futures, and how to design the merge function so it degrades gracefully when one of the parallel calls fails.

🎯 Key Principle: Design every component in a pipeline to be step-shaped — one context in, one context out — and complex orchestration patterns like fan-out, ensemble aggregation, or conditional branching become implementation details that the pipeline runner never needs to know about. Complexity is contained, not distributed.

Putting It All Together: What This Example Demonstrates

The research-and-draft pipeline in this section is compact by design, but every structural decision it makes generalizes directly to larger systems:

🔧 Uniform step contractPipelineContext in, PipelineContext out — makes error handling, logging, and step composition a solved problem rather than something each step reinvents.

📚 Router as a step — the classify-then-dispatch pattern cleanly separates the classification decision from the branch implementations, making it easy to add a third branch (say, a data-analysis route) without touching the existing branches.

🎯 Wrapper composition — logging wraps retry, which wraps the step function. Each wrapper does exactly one thing. This is the decorator pattern applied to pipeline steps, and it means you can add rate-limit tracking, cost accounting, or cache lookup as additional wrappers without modifying any step.

🧠 Telemetry inside the context — storing step records in ctx.metadata ensures that a failed pipeline run is self-describing. The artifact that failed carries the evidence needed to understand why.

⚠️ Common Mistake: Logging step outputs to a separate system and discarding the context after the pipeline completes. When a failure occurs in step 4 of an 8-step pipeline, you need to know what step 2 actually produced — not what you expected it to produce. If that information lives only in an external log that you correlate by timestamp, you're one log-ingestion lag away from debugging blind.

The simplifications in this example worth acknowledging: the validate_step uses the same LLM to check the draft as produced it, which means systematic model errors may not be caught. In practice you would either use a different model for validation, add rule-based checks (length, keyword presence), or both. The retry logic also re-runs the step without injecting any information about why it failed — a more sophisticated version would append the error to the context so the re-prompt can correct for the specific problem.

Those refinements are extensions of the same structural foundation, not replacements for it. The pipeline you've built here is a working skeleton. Production readiness means adding detail to the wrappers and the prompt design, not tearing down the architecture.

Common Pitfalls When Building Agent Pipelines

Building a pipeline that works once is surprisingly easy. Building one that works reliably across hundreds of diverse inputs, survives a prompt edit made three weeks later, and degrades gracefully when a single step returns garbage — that is the harder problem, and it is where most teams stumble. The pitfalls covered in this section are not exotic edge cases; they are the specific failure modes that appear repeatedly when LLM calls are wired together without deliberate design. Each one has a concrete shape: a symptom you can observe, a root cause you can name, and a structural fix you can apply.

Pitfall 1: Prompt Bleed

Prompt bleed occurs when each step in a pipeline receives the full accumulated context from every prior step, rather than a clean, extracted summary of only the information it actually needs. The problem compounds as pipelines grow: by step four or five, a prompt that started at a few hundred tokens may balloon to several thousand, carrying along raw intermediate outputs, tool call results, and conversational scaffolding that the current step has no use for.

The concrete cost is twofold. First, token count inflates at every step — you pay for the same early context repeatedly across all downstream calls. Second, and more subtly, large language models tend to give disproportionate weight to material that appears early and frequently in a context window. If your step-one output is a verbose 800-token research summary and your step-four task is to extract a single decision field, the model's attention is split across noise it shouldn't care about, and output quality degrades.

Wrong thinking: "Passing everything downstream ensures no information is lost." ✅ Correct thinking: "Each step should receive a precise contract — only the fields its prompt was designed to consume."

The fix is to treat the output of each step as a structured extraction, not a raw text blob to be forwarded wholesale. If step one produces a research summary, step two should receive only the structured fields that step two's prompt was written to operate on.

from dataclasses import dataclass
from typing import Optional

@dataclass
class ResearchOutput:
    """Structured result extracted from the research step."""
    topic: str
    key_claims: list[str]      # at most five bullet points
    confidence: str            # "high" | "medium" | "low"
    source_count: int

@dataclass
class DraftInput:
    """Only the fields the drafting step needs — nothing more."""
    topic: str
    key_claims: list[str]
    tone: str                  # injected by the router, not from research output

def research_to_draft_input(research: ResearchOutput, tone: str) -> DraftInput:
    """
    Explicit projection: drop confidence and source_count, which the
    drafting prompt does not reference and would only add noise.
    """
    return DraftInput(
        topic=research.topic,
        key_claims=research.key_claims,
        tone=tone,
    )

This small projection function is doing real architectural work. It makes the information contract between steps explicit and auditable. When a downstream step starts producing odd output, you can inspect exactly what it received rather than reasoning about a growing blob of text.

💡 Pro Tip: If you find yourself writing a prompt that begins "Given everything above, now do X," that is a signal you have prompt bleed. Rewrite the prompt to start from a clean structured input and project the relevant fields explicitly.

Pitfall 2: Shared Mutable State Without Coordination

This pitfall appears most often in fan-out / fan-in patterns or anywhere multiple steps run concurrently and write results back to a shared pipeline context. The symptom is non-deterministic output: the same input produces different pipeline results on different runs, with no obvious explanation.

The root cause is a classic write-write conflict: two steps both write to the same context key without any coordination about ordering. Consider a fan-out where three specialized agents each append their findings to a shared context["findings"] list. If the aggregation step reads that list before all three writers have finished, or if two writers race to append simultaneously, the list's contents and order are unpredictable.

Fan-out with uncoordinated shared state:

  [Input]
     |
  [Router]
  /   |   \
[A] [B] [C]   ← all three write to context["findings"]
  \   |   /
  [Aggregate]  ← reads context["findings"]... but how many items? in what order?

The safe pattern is to give each parallel step its own isolated output slot and then perform a single explicit merge in a dedicated aggregation step. The aggregation step owns the merge logic; no individual step writes to a shared collection.

import asyncio
from dataclasses import dataclass, field

@dataclass
class PipelineState:
    input_text: str
    # Each branch writes to its own slot — no shared mutable collection
    findings_a: list[str] = field(default_factory=list)
    findings_b: list[str] = field(default_factory=list)
    findings_c: list[str] = field(default_factory=list)
    merged_findings: list[str] = field(default_factory=list)

async def run_branch_a(state: PipelineState) -> None:
    # Simulates an LLM call; writes only to its own slot
    state.findings_a = ["claim from branch A"]

async def run_branch_b(state: PipelineState) -> None:
    state.findings_b = ["claim from branch B"]

async def run_branch_c(state: PipelineState) -> None:
    state.findings_c = ["claim from branch C"]

def merge_findings(state: PipelineState) -> None:
    """
    Single, deterministic merge step. Ordering is explicit and
    reproducible — no race condition possible.
    """
    state.merged_findings = (
        state.findings_a +
        state.findings_b +
        state.findings_c
    )

async def run_pipeline(text: str) -> PipelineState:
    state = PipelineState(input_text=text)
    # Fan-out: all three branches run concurrently but write to separate slots
    await asyncio.gather(
        run_branch_a(state),
        run_branch_b(state),
        run_branch_c(state),
    )
    # Fan-in: one deterministic merge after all branches have completed
    merge_findings(state)
    return state

⚠️ Common Mistake: Using a single shared list and calling .append() from multiple async tasks. Python's asyncio does not protect you from logical race conditions — two coroutines can both read the list as empty, then both append, and the order of appends is determined by scheduling, not by any meaningful logic in your pipeline.

🎯 Key Principle: Parallel steps should be pure producers, each writing to an isolated output. Merging is the job of a dedicated step that runs after all producers have completed.

Pitfall 3: Testing on Three Manual Cases and Calling It Done

A pipeline that produces good output on three hand-crafted examples will often degrade noticeably when you tweak a prompt, change a model parameter, or run it against a broader distribution of inputs. Without an automated evaluation harness, you will not know the regression happened until a user reports it — which may be weeks after the change that caused it.

The temptation to skip systematic testing is understandable: LLM outputs are probabilistic and hard to assert against with traditional unit tests. But the jump from "hard to test perfectly" to "not worth testing at all" is a costly one. A small eval harness with even ten to twenty labeled examples will catch the majority of regressions introduced by prompt edits.

The minimal harness you need is not sophisticated. You need: a fixed set of inputs with known expected outputs (or expected output properties), a scoring function that can be applied automatically, and a way to run that suite before merging any prompt change.

import json
from pathlib import Path
from typing import Callable

def run_eval_suite(
    pipeline_fn: Callable[[str], dict],
    suite_path: str = "eval_suite.json",
) -> dict:
    """
    Loads a JSON file of {"input": ..., "expected": ...} records,
    runs the pipeline against each, and returns a summary.

    Keeps scoring simple: exact match on a key field is enough to
    catch most prompt-edit regressions in practice.
    """
    suite = json.loads(Path(suite_path).read_text())
    results = {"passed": 0, "failed": 0, "failures": []}

    for case in suite:
        output = pipeline_fn(case["input"])
        # Score on the field that matters most for this pipeline
        actual = output.get("classification")
        expected = case["expected"]["classification"]

        if actual == expected:
            results["passed"] += 1
        else:
            results["failed"] += 1
            results["failures"].append({
                "input": case["input"],
                "expected": expected,
                "actual": actual,
            })

    total = results["passed"] + results["failed"]
    results["pass_rate"] = results["passed"] / total if total > 0 else 0.0
    print(f"Eval: {results['passed']}/{total} passed ({results['pass_rate']:.0%})")
    return results

This is deliberately minimal. A real harness would add richer scoring (semantic similarity, rubric-based LLM-as-judge for open-ended outputs, latency tracking), but the point is that even this bare-bones version catches the single most damaging category of regression: a prompt edit that silently changes how the model classifies or routes inputs.

💡 Real-World Example: A common scenario is a team refining a routing prompt to handle a new edge case, inadvertently changing the few-shot examples in a way that shifts the model's behavior on previously-working cases. Without an eval suite, the regression ships. With twenty labeled cases and the harness above, the pass rate drops visibly on the next run and the change is investigated before it reaches users.

🤔 Did you know? The gap between a pipeline that works on curated demos and one that works on production traffic is largely a sampling problem — handcrafted test cases tend to cluster around the cases you already know how to handle, leaving the tail distribution untested. A small but diverse eval suite, built by saving and labeling real inputs as the pipeline runs, closes this gap faster than any amount of careful prompt design alone.

Pitfall 4: Error Handling at the Application Level Instead of the Step Level

Application-level error handling means wrapping the entire pipeline in a single try/except block and treating any failure as a reason to abort the whole run. This pattern is natural when you first wire steps together — it is the path of least resistance — but it makes your pipeline brittle in a specific, avoidable way.

The problem is that LLM steps fail in heterogeneous ways. A step might return malformed JSON that fails your parser. A model endpoint might return a timeout on a transient network hiccup. A particular input might trigger a safety refusal that produces an unexpected output shape. These are all different failure modes with different appropriate responses: a parse failure might warrant a retry with an explicit format reminder; a timeout might warrant a simple backoff retry; a safety refusal might warrant a fallback to a simpler prompt or a default value.

When your only error boundary is at the application level, all of these collapse into a single "pipeline failed" outcome, and you lose the ability to recover selectively.

❌ Application-level only:

  try:
    step_1() → step_2() → step_3() → step_4()
  except:
    abort entire run

  A transient timeout in step_3 destroys all work from steps 1 and 2.

✅ Step-level with targeted recovery:

  step_1()          ← succeeds, result cached
  step_2()          ← succeeds, result cached
  step_3()          ← fails (timeout)
    └── retry(step_3, max_attempts=2)
        └── if still fails → fallback_step_3()
  step_4(result_of_step_3_or_fallback)

The structural fix is to give each step its own error boundary with three possible outcomes: success, recoverable failure (retry or fallback), and unrecoverable failure (propagate up). This is the same pattern used in robust distributed systems, and it applies directly to LLM pipelines.

import time
from typing import TypeVar, Callable, Optional

T = TypeVar("T")

def with_retry(
    fn: Callable[[], T],
    max_attempts: int = 3,
    backoff_seconds: float = 1.0,
    fallback: Optional[Callable[[], T]] = None,
) -> T:
    """
    Step-level retry wrapper with optional fallback.
    Keeps error handling local to the step, not the application.
    """
    last_exc: Optional[Exception] = None

    for attempt in range(1, max_attempts + 1):
        try:
            return fn()
        except Exception as exc:  # noqa: BLE001
            last_exc = exc
            if attempt < max_attempts:
                time.sleep(backoff_seconds * attempt)  # linear backoff
            else:
                # All retries exhausted
                if fallback is not None:
                    return fallback()
                raise RuntimeError(
                    f"Step failed after {max_attempts} attempts"
                ) from last_exc

    # Unreachable, but satisfies type checker
    raise RuntimeError("Unexpected exit from retry loop")


## Usage in a pipeline
def run_pipeline(input_text: str) -> dict:
    research = with_retry(
        fn=lambda: call_research_step(input_text),
        max_attempts=3,
        fallback=lambda: {"key_claims": [], "confidence": "low"},
    )

    draft = with_retry(
        fn=lambda: call_draft_step(research),
        max_attempts=2,
        # No fallback here — drafting failure should propagate
    )

    return draft

⚠️ Common Mistake: Using the same retry count and backoff for every step. A fast formatting step that fails in under 100ms can be retried aggressively; a slow research step with a 10-second latency needs a more conservative retry budget. Size the retry policy to the step.

Pitfall 5: Over-Decomposition

The instinct to decompose complex tasks into smaller steps is sound — it is one of the core reasons pipeline architectures exist. But decomposition has a cost: every additional step adds a round-trip latency, a new prompt to maintain, a new failure point, and a new surface area for context translation errors. When a task is split more finely than the problem requires, these costs accumulate without any corresponding improvement in output quality.

Over-decomposition is the pattern of splitting a task into more steps than necessary because decomposition feels like rigor. The tell is when a step's entire job is to reformat or lightly transform the output of the previous step — work that could have been part of the previous step's prompt at no meaningful cost.

Over-decomposed (unnecessary):

  [Extract entities] → [Format entities as JSON] → [Validate JSON] → [Score entities]
                            ↑
                    This step does nothing the
                    extraction prompt couldn't do
                    with an output format instruction.

Appropriate decomposition:

  [Extract + format entities as JSON] → [Score entities]

The rule of thumb is that each split should be justified by a measurable benefit: improved output quality on a specific eval, a meaningful reduction in prompt complexity, or the ability to run branches in parallel. If you cannot articulate the benefit, the split is probably unnecessary.

🎯 Key Principle: Decompose to the level where each step has a single, clearly bounded responsibility that benefits from being isolated — not further. More steps is not inherently better architecture.

A practical way to check for over-decomposition is to ask: if I merged this step with the one before it, would any of my eval cases get worse? If the answer is no, the steps should probably be merged. If the answer is yes for a meaningful fraction of cases, the split is justified.

💡 Mental Model: Think of each step boundary as a tax. You pay it in latency, in maintenance cost, and in the risk of a translation error when structured output from step N becomes the input to step N+1. You should only pay that tax when the benefit on the other side of the boundary is clearly larger than the tax itself.

Over-decomposition also makes debugging harder, not easier. When a pipeline has twelve steps and the final output is wrong, identifying which step introduced the error requires inspecting twelve sets of intermediate outputs. A well-decomposed pipeline of five steps with clear contracts is easier to debug than a sprawling twelve-step pipeline where half the steps exist for historical reasons.

Putting the Pitfalls Together

These five pitfalls are not independent — they often appear in clusters. A pipeline built quickly to pass a demo will tend to have prompt bleed (the easiest thing is to pass everything), no eval harness (the quick path is manual inspection), and application-level error handling (the natural first cut). Addressing them is not a one-time cleanup; it is an architectural discipline applied incrementally as the pipeline matures.

📋 Quick Reference Card:

❌ Pitfall 🔍 Symptom ✅ Fix
🔴 Prompt bleed Token cost grows with pipeline depth; downstream quality degrades Project only needed fields to each step
🟠 Shared mutable state Non-deterministic output across runs Isolated output slots per branch; single merge step
🟡 No eval harness Prompt edits silently break existing cases Small labeled suite run before every change
🔵 App-level error handling Single bad output crashes entire run Step-level retry and fallback policies
🟣 Over-decomposition High latency, hard to debug, no quality gain Each split justified by a measurable benefit

🧠 Mnemonic: BSEEOBleed, State, Eval, Errors, Over-decomposition. These are the five structural weaknesses to audit when a pipeline behaves unexpectedly in production. (This covers the most common categories, not every possible failure mode.)

The section that follows consolidates the full lesson's vocabulary and points toward the next topics in the roadmap, where these pipeline patterns are applied at larger scale and combined with tool use and memory systems.

Key Takeaways and What Comes Next

You started this lesson with a single LLM call. You're ending it with a vocabulary for systems — sequential chains that pass structured state, routers that select branches dynamically, fan-out/fan-in patterns that parallelize independent work, and ensemble patterns that aggregate competing answers for higher reliability. These are not just design options; they are composable building blocks that combine in the same pipeline. Understanding how they interact — and where each imposes cost — is the structural foundation for everything that follows in this course.

This final section consolidates that vocabulary, surfaces the design principles that cut across all four patterns, and maps the path forward into the more detailed lessons ahead.


The Four Patterns, Reconsolidated

Before moving to principle, it helps to see all four patterns in one place. Each has a distinct shape, a distinct failure mode, and a distinct cost profile.

SEQUENTIAL CHAIN          ROUTER
─────────────────         ──────────────────
[Step A]                  [Classify]
   │                          │
   ▼                    ┌─────┴──────┐
[Step B]              [Path A]   [Path B]
   │                    │            │
   ▼                    └─────┬──────┘
[Step C]                      ▼
                         [Continue]

FAN-OUT / FAN-IN          ENSEMBLE
─────────────────         ──────────────────
       [Split]            [Same Prompt x N]
     ┌──┴──┐              ┌───┬───┬───┐
  [W1] [W2] [W3]         [A] [B] [C]
     └──┬──┘              └───┴───┴───┘
       [Merge]                  │
                           [Judge/Merge]

The key insight from combining all four views: sequential depth controls latency, fan-out width controls cost, routers control conditional branching, and ensembles trade cost for answer quality. These dimensions are independent, which means you can have a pipeline that sequences a router into a fan-out into an ensemble — all in one flow.

💡 Mental Model: Think of these four patterns the way a software engineer thinks about loops, conditionals, parallel threads, and voting algorithms. None of them is a complete program on its own. They combine. The question is not "which pattern should I use?" but "which combination of patterns fits this task's shape?"


Core Design Principles That Cut Across All Patterns

Principle 1: Explicit Data Contracts Are Your Primary Defense Against Silent Failures

Data contracts — typed, validated schemas that define what one step produces and what the next step expects — are the most important structural discipline in multi-step pipelines. Without them, a malformed output from Step 2 silently propagates to Step 3, corrupts Step 4, and produces a final answer that is wrong in a way that is hard to trace back to its origin.

The concrete failure looks like this: Step 2 returns a JSON object with a key named summary instead of the expected research_summary. Step 3 looks for research_summary, finds nothing, substitutes an empty string, and generates a confident-sounding draft from nothing. The pipeline completes. No exception is raised. The output is plausible-looking garbage.

Per-step validation — catching schema violations at the boundary where they occur — eliminates this class of failure. The following example shows the pattern using Pydantic, which provides both schema definition and runtime validation in one step:

from pydantic import BaseModel, ValidationError
from typing import Optional
import json

## ── Data contracts defined once, used at every boundary ──────────────────────

class ResearchOutput(BaseModel):
    research_summary: str
    key_claims: list[str]
    confidence: float  # 0.0 – 1.0

class DraftOutput(BaseModel):
    draft_text: str
    word_count: int
    sources_cited: list[str]

## ── Boundary validation helper ────────────────────────────────────────────────

def validate_step_output(raw_output: str, model: type[BaseModel]) -> BaseModel:
    """
    Parses and validates raw LLM JSON output against a Pydantic model.
    Raises ValidationError immediately if the output violates the contract,
    so the failure surfaces at the step that caused it — not three steps later.
    """
    try:
        data = json.loads(raw_output)
        return model.model_validate(data)
    except (json.JSONDecodeError, ValidationError) as e:
        # In production, you'd log the raw output and the error together
        # to make debugging tractable.
        raise RuntimeError(f"Step output failed validation: {e}") from e

## ── Usage at a pipeline boundary ─────────────────────────────────────────────

def research_step(topic: str) -> ResearchOutput:
    raw = call_llm(build_research_prompt(topic))  # returns a JSON string
    return validate_step_output(raw, ResearchOutput)  # fails fast if malformed

def draft_step(research: ResearchOutput) -> DraftOutput:
    # The next step receives a typed object — not a raw string.
    # This makes it impossible to accidentally pass an empty string
    # where research_summary is expected.
    raw = call_llm(build_draft_prompt(research.research_summary, research.key_claims))
    return validate_step_output(raw, DraftOutput)

This pattern — define a schema per step, validate at each boundary, pass typed objects between steps — is the single most effective structural practice in multi-step pipelines. It is not glamorous, but the absence of it is the root cause of a large fraction of the silent errors that appear in production pipelines.

🎯 Key Principle: Validate at the boundary where failure originates, not at the end where failure manifests. A pipeline that validates only its final output is a pipeline that makes debugging exponentially harder as the number of steps grows.


Principle 2: Cost and Latency Must Be Estimated Before Committing to a Design

Sequential depth and fan-out width are the two independent dials that drive cost and latency. Both scale predictably — and predictably in ways that make a naively designed pipeline expensive or slow enough to be impractical.

Sequential depth drives latency linearly. If each step takes 2 seconds and you have 6 steps, your minimum pipeline latency is 12 seconds, regardless of parallelism, because each step depends on the previous one's output. Add a step, add latency — unconditionally.

Fan-out width drives cost linearly (and latency only up to the duration of the longest parallel call). If you fan out to 5 workers, you make 5 LLM calls with 5× the token cost. Wider fan-out costs more money. Latency stays approximately constant if the calls are truly parallel.

The following utility makes these relationships explicit before you commit to a design:

from dataclasses import dataclass

@dataclass
class PipelineEstimate:
    sequential_steps: int
    fanout_width: int
    ensemble_candidates: int
    avg_step_latency_s: float   # estimated seconds per LLM call
    avg_step_cost_usd: float    # estimated cost per LLM call in USD

    def estimated_latency_s(self) -> float:
        """
        Sequential steps dominate latency; fan-out and ensemble calls
        run in parallel, so only the longest matters — approximated here
        as avg_step_latency_s for simplicity.
        """
        return (
            self.sequential_steps * self.avg_step_latency_s
            + self.avg_step_latency_s  # fan-out block (parallel, so ~1 call duration)
            + self.avg_step_latency_s  # ensemble block (parallel, so ~1 call duration)
        )

    def estimated_cost_usd(self) -> float:
        """
        Every call costs money — sequential, fan-out, and ensemble alike.
        """
        return (
            self.sequential_steps
            + self.fanout_width
            + self.ensemble_candidates
        ) * self.avg_step_cost_usd

    def summary(self) -> str:
        return (
            f"Estimated latency : {self.estimated_latency_s():.1f}s\n"
            f"Estimated cost    : ${self.estimated_cost_usd():.4f} per invocation"
        )

## ── Example: research-and-draft pipeline ─────────────────────────────────────
estimate = PipelineEstimate(
    sequential_steps=4,    # classify → research → draft → review
    fanout_width=3,        # 3 parallel research sub-tasks
    ensemble_candidates=2, # 2 competing drafts for final judge
    avg_step_latency_s=2.5,
    avg_step_cost_usd=0.004,
)
print(estimate.summary())
## Estimated latency : 12.5s
## Estimated cost    : $0.0360 per invocation

(This is a simplified model — real-world estimates must account for token count variance, retry overhead, and rate limits. Use it as a planning tool, not a billing guarantee.)

The value of doing this arithmetic up front is that it surfaces design decisions that feel free but are not. Adding an ensemble step to improve quality costs 2–5× in LLM calls for that step. Adding two more sequential review steps adds 5 seconds of latency. Knowing this before writing the pipeline changes which designs are even worth prototyping.

⚠️ Common Mistake — Mistake 1: Designing for quality first and checking cost afterward. A pipeline that makes 12 LLM calls per user request may produce excellent output in testing and then prove uneconomical at scale. Estimate cost and latency at design time, not after implementation.



The Patterns Are Composable, Not Mutually Exclusive

One of the more common misreadings of a pattern vocabulary is treating the patterns as competing alternatives — as if you choose one and exclude the others. The four pipeline patterns in this lesson are composable building blocks. A production pipeline might use all four in a single flow:

[User Query]
     │
     ▼
[ROUTER: classify intent]        ← Router
     │
     ├─── factual query ──────► [sequential: retrieve → verify → format]  ← Sequential Chain
     │
     └─── research query ─────► [fan-out: 3 parallel sub-searches]        ← Fan-Out
                                        │
                                  [fan-in: merge findings]                 ← Fan-In
                                        │
                                 [ENSEMBLE: 2 competing drafts]            ← Ensemble
                                        │
                                  [judge: select best draft]
                                        │
                                  [sequential: edit → format]              ← Sequential Chain

The router selects the path. The sequential chain handles ordered dependencies. The fan-out parallelizes work that has no ordering dependency. The ensemble improves answer quality at a cost. None of these decisions precludes the others.

💡 Real-World Example: A content moderation pipeline might route by content type (router), run parallel classifiers for toxicity, spam, and misinformation simultaneously (fan-out), aggregate those signals (fan-in), and then sequence a final policy decision through a structured review step (sequential chain). All four patterns, one pipeline, one purpose.

🎯 Key Principle: The question is never "which pattern?" — it is "where in this pipeline does each pattern's shape fit the task's shape?" A sequential chain fits where Step B genuinely depends on Step A's output. A fan-out fits where sub-tasks are independent. A router fits where the pipeline must branch on content. An ensemble fits where quality variance in a single call is the primary risk.


Summary Reference Table

📋 Quick Reference Card: Pipeline Pattern Comparison

Pattern Shape Primary Cost Driver Primary Latency Driver Main Failure Mode Best Fit
🔗 Sequential Chain A → B → C Number of steps × token cost Sum of all step latencies Silent error propagation across steps Ordered dependencies; refinement loops
🔀 Router Classify → Branch Classification call + one branch Classification latency + branch latency Misclassification sends work down wrong path Conditional logic; specialization by content type
⚡ Fan-Out / Fan-In Split → Parallel Workers → Merge Number of parallel workers × token cost Slowest single worker (not sum) Merge logic fails to handle partial failures Independent sub-tasks; parallel data gathering
🎯 Ensemble N independent calls → Judge N × token cost (multiplier per run) Slowest candidate + judge latency Judge itself introduces bias or fails High-stakes answers; reducing single-call variance


What You Now Understand That You Didn't Before

It is worth being explicit about the conceptual shift this lesson represents, because it is easy to absorb the patterns without absorbing the underlying reframe.

Before this lesson, the natural mental model for using LLMs in software is a function call: send a prompt, receive a response, use the response. The pipeline is implicit, informal, and usually just a loop or a sequence of if statements around raw API calls.

After this lesson, you have a structural vocabulary that lets you reason about multi-step agent behavior the way you reason about software architecture. The four patterns are the primitives. Data contracts are the interfaces. Cost and latency are first-class design constraints, not afterthoughts. Failure propagates through boundaries in predictable ways that explicit validation can intercept.

This shift matters because it changes what questions you ask at design time:

🧠 Instead of: "What prompt should I write?""What is the shape of this task, and which pipeline pattern fits that shape?"

📚 Instead of: "Why is my output wrong?""Which step violated its output contract, and did validation catch it at the boundary?"

🔧 Instead of: "This is slow, what can I do?""Am I paying sequential latency for work that could be parallelized with fan-out?"

🎯 Instead of: "My answer quality is inconsistent""Is this a case where ensemble aggregation justifies the additional cost?"

These are architectural questions. They exist at a level above any specific prompt or model. The vocabulary from this lesson is what makes those questions askable.


Final Critical Points

⚠️ The most dangerous failure mode in multi-step pipelines is not a crash — it is plausible-looking wrong output. A pipeline that raises an exception is easy to debug. A pipeline that silently propagates a malformed intermediate result and produces a confident-sounding final answer is hard to detect and harder to trust. Per-step validation is the structural answer to this problem. Without it, you are relying on the final output looking wrong enough to catch manually — which it often does not.

⚠️ Composability creates a combinatorial design space that is easy to over-engineer. The ability to combine all four patterns in a single pipeline does not mean every pipeline should. A three-step sequential chain often outperforms a more elaborate design because it has fewer failure surfaces, lower latency, and lower cost. Start with the simplest structure that fits the task's shape, then add complexity only where a specific limitation requires it.

⚠️ Fan-out without a defined merge strategy is half a design. It is common to prototype a fan-out — spin up parallel workers, collect results — and then realize that the merge logic (how to combine conflicting outputs, how to handle a worker failure, how to weight sources) is harder than the fan-out itself. Design the merge before committing to the fan-out.

🧠 Mnemonic: "Validate early, estimate honestly, compose deliberately." These three phrases cover the primary failure modes this lesson has addressed: silent error propagation (validate early), cost and latency surprises (estimate honestly), and over-engineered pipelines (compose deliberately).


Practical Next Steps

The structural vocabulary from this lesson points directly to three concrete actions you can take before moving to the next lesson:

1. Audit an existing pipeline you own for missing data contracts. If you have any multi-step LLM workflow — even a simple two-step chain — check whether each boundary has an explicit schema and per-step validation. The absence of this is the single most common source of hard-to-debug failures. Adding Pydantic models at each boundary is an afternoon's work and often surfaces bugs that were already present but invisible.

2. Run a cost and latency estimate for your next pipeline design before writing code. Use the estimation pattern from this lesson — count sequential steps, count fan-out width, count ensemble candidates, multiply by average call cost and latency. If the numbers are surprising, redesign before implementing. This is significantly cheaper than discovering the numbers after the pipeline is built.

3. Map one real task to the pattern vocabulary. Take a task you are currently solving (or considering solving) with LLMs and ask: what is its shape? Does it have ordered dependencies (sequential chain)? Independent sub-tasks (fan-out)? Content-conditional branching (router)? High-stakes answers where variance matters (ensemble)? Naming the shape before writing code is the practice that the pipeline pattern vocabulary is designed to enable.


What Comes Next

This lesson established the structural vocabulary. The next two lessons go deeper into the mechanics of individual patterns with implementation detail this overview could not provide.

Prompt Chaining Patterns covers the mechanics of sequential chains and routers in greater depth: how to construct prompts that reliably produce structured output at each step, how to design router classification prompts that minimize misrouting, how to handle state accumulation across a long chain without hitting context limits, and how to structure retry logic at the step level without duplicating work.

Fan-Out and Fan-In Workflows covers the mechanics of parallelism in depth: how to implement fan-out using async execution patterns, how to write merge functions that handle partial failures gracefully, how to set concurrency limits that prevent rate-limit errors, and how to design fan-in aggregation for different types of output (ranked lists, conflicting facts, structured data).

Both lessons assume the vocabulary and design principles from this lesson. The patterns introduced here are the conceptual scaffolding; those lessons provide the implementation detail.

💡 Pro Tip: As you move into those lessons, keep the summary table from this section nearby. When a specific implementation detail from the next lessons feels unfamiliar, locating it on that table — "this is fan-in merge logic, which is the hardest part of fan-out/fan-in" — will make the detail easier to place in context and easier to retain.