You are viewing a preview of this lesson. Sign in to start learning
Back to Agentic AI as a Part of Software Development

Prompt Chaining & Parallel Workflows

Build multi-step agent pipelines using sequential chains, routers, fan-out/fan-in, and ensemble patterns.

Last generated

Why Multi-Step Agent Pipelines Exist

Imagine asking a single contractor to simultaneously design the architecture, pour the foundation, wire the electrical, and pass the building inspection — all in one uninterrupted work session, with no checkpoints and no way to catch a measurement error before the walls go up. The analogy sounds absurd for construction, yet it describes exactly how many developers first approach complex tasks with language models: one enormous prompt, one response, and hope that everything came out right. When the output is wrong — and with genuinely complex tasks, it often is — there is no seam in the process where the error can be isolated. Multi-step agent pipelines exist to solve this problem by giving complex work the same structure that complex human projects have always had: stages, checkpoints, specialists, and the ability to catch and correct mistakes before they propagate.

The Three Failure Modes of Single-Call Complexity

Context Window Pressure

Every language model call operates within a context window: a finite number of tokens that can be held in view simultaneously. For tasks that require retrieving and reasoning over large codebases, long documents, multi-turn conversation histories, and tool outputs at the same time, this limit becomes a hard constraint. Models under heavy context pressure exhibit a well-documented pattern sometimes called the lost-in-the-middle effect: information positioned in the middle of a very long prompt is attended to less reliably than information at the start or end.

The architectural implication is that decomposing work reduces the information each step needs to hold simultaneously. A step that only needs to extract entity names from a document doesn't need the full conversation history. By designing steps with narrower scope, each step operates within a context window that has breathing room — and the model's attention is focused precisely on what matters for that step.

Accuracy Degradation on Multi-Step Reasoning

When a single prompt asks a model to perform several distinct reasoning operations in sequence — classify a user's intent, retrieve relevant context, formulate a plan, then draft a response — errors in the early operations compound. A misclassification in step one means the retrieved context is wrong; wrong context means the plan is built on faulty assumptions; a faulty plan means the final draft is plausible-sounding but incorrect. The model has no mechanism for catching this drift because it never pauses to verify intermediate conclusions.

This is distinct from the context window problem. Even with unlimited context, chaining multiple reasoning steps into a single inference pass accumulates uncertainty. Each step in a chain has its own error rate; when those steps are collapsed into one prompt, their errors multiply rather than being caught and corrected independently.

## Illustrating the single-call collapse problem
## This is the naive approach we want to move away from

def single_call_pipeline(user_query: str, documents: list[str]) -> str:
    """
    Attempts to classify, retrieve, reason, and respond in one prompt.
    Works acceptably on simple queries; degrades on complex ones.
    """
    combined_docs = "\n\n".join(documents)  # May easily exceed context budget
    prompt = f"""
    Given the following documents:
    {combined_docs}

    User query: {user_query}

    Please:
    1. Determine the type of question being asked
    2. Identify the most relevant passages
    3. Reason through the answer step by step
    4. Draft a clear, accurate response
    """
    # All four subtasks in one call — errors in step 1 corrupt steps 2, 3, and 4
    return call_llm(prompt)
Lack of Intermediate Validation

The third failure mode is the one that makes the first two difficult to debug: there are no seams where correctness can be checked. In a single-call pipeline, the first observable artifact is the final output. If that output is wrong, you have no information about where in the reasoning chain things went sideways.

A pipeline with explicit steps can validate each output before it becomes the next step's input. A classification result can be checked against an allowed taxonomy. A retrieved set of passages can be scored for relevance before being passed downstream. None of these checks are possible without explicit step boundaries.

💡 Mental Model: Think of step boundaries as type-checking checkpoints for reasoning. Just as a type system catches incompatible data before a function call goes wrong at runtime, validation gates at step boundaries catch reasoning errors before they corrupt downstream steps.

The Core Tradeoff: One Call vs. Many

Decomposing work into multiple calls is not free. Each call adds latency, consumes tokens in overhead, and introduces a new failure point. The tradeoff has a clean formulation: a single call is preferred when the task is genuinely atomic; a pipeline is preferred when the task requires distinct reasoning modes or when correctness at intermediate stages can be verified.

Atomic tasks are things like "translate this sentence" or "classify this review as positive or negative." There is one thing being done, it can be done well in one inference pass, and there is no meaningful intermediate state to validate.

Non-atomic tasks are things like "given this codebase, identify the root cause of this bug, propose three fixes, evaluate each for correctness and side effects, and implement the best one." This task requires different kinds of reasoning at each stage, and the correctness of each stage is independently verifiable — which means a pipeline can catch errors before they propagate.

## Contrasting approaches for a non-atomic task
from dataclasses import dataclass
from typing import Callable

@dataclass
class PipelineStep:
    name: str
    fn: Callable

## --- Approach A: Single call (fragile for complex tasks) ---
def analyze_bug_single_call(codebase: str, error_trace: str) -> str:
    prompt = f"""
    Codebase: {codebase}
    Error: {error_trace}
    Identify the root cause, propose three fixes, evaluate each,
    and implement the best one.
    """
    return call_llm(prompt)  # No checkpoints; errors invisible until final output


## --- Approach B: Decomposed pipeline (preferred for non-atomic tasks) ---
def analyze_bug_pipeline(codebase: str, error_trace: str) -> str:
    steps = [
        PipelineStep("root_cause", identify_root_cause),
        PipelineStep("propose_fixes", generate_fix_candidates),
        PipelineStep("evaluate_fixes", evaluate_candidates),
        PipelineStep("implement", implement_best_fix),
    ]
    state = {"codebase": codebase, "error": error_trace}
    for step in steps:
        state = step.fn(state)
        validate_step_output(step.name, state)  # Catch errors before propagating
    return state["final_output"]

Approach B is more verbose. It is also far easier to debug when something goes wrong: you know exactly which step produced the bad output, what its input was, and what it returned.

How Pipeline Structure Determines Where Errors Surface

The shape of a pipeline is not just an implementation detail — it is an error management strategy. Where you put step boundaries determines where errors can be caught.

Structure A: No intermediate validation

[Input] ──► [Stage 1] ──► [Stage 2] ──► [Stage 3] ──► [Output]
                                                           ▲
                                              First observable error point


Structure B: Validation gates between stages

[Input] ──► [Stage 1] ──► [Validate] ──► [Stage 2] ──► [Validate] ──► [Stage 3] ──► [Output]
                               ▲                            ▲
                          Error caught here           Error caught here

In Structure A, an error at Stage 1 corrupts Stage 2's input, which corrupts Stage 3's input, and the final output is wrong in a way that may look plausible. In Structure B, an error at Stage 1 is caught before Stage 2 runs — the pipeline can retry, log the failure, or escalate to a fallback strategy.

⚠️ Common Mistake: Adding pipeline structure without adding validation gates. Decomposing a task into steps provides no reliability benefit if each step's output is passed downstream without checking. Step boundaries are only valuable when they are also validation checkpoints.

A Taxonomy of Pipeline Shapes

There are four primary structures in this lesson, and they are composable — real pipelines are typically combinations of them rather than pure instances of one.

Sequential Chains

A sequential chain passes the output of one step as the input to the next, in a fixed linear order. Each step can validate, transform, or enrich the data before forwarding it.

[Step 1: Extract] ──► [Step 2: Classify] ──► [Step 3: Summarize] ──► [Output]
Routers

A router inspects an intermediate output and selects which downstream step or branch to invoke next, enabling conditional logic at the pipeline level without requiring every step to handle every possible case.

                      ┌──► [Branch A: Technical Support]
[Input] ──► [Classifier] ─┤
                      ├──► [Branch B: Billing Inquiry]
                      │
                      └──► [Branch C: General Question]
Fan-Out / Fan-In

A fan-out/fan-in pattern splits a task into parallel subtasks, processes them concurrently, and then merges the results back into a single output.

           ┌──► [Subtask A] ──┐
           │                  │
[Input] ──► [Split] ──► [Subtask B] ──► [Merge] ──► [Output]
           │                  │
           └──► [Subtask C] ──┘
Ensembles

An ensemble runs multiple independent generations — potentially with varied prompts, temperatures, or models — and combines their results into a single output. Ensembles are a specific application of the fan-out/fan-in pattern, specialized for quality improvement through redundancy.

           ┌──► [Generation 1 (temp=0.3)] ──┐
           │                                │
[Input] ──► [Fan-Out] ──► [Generation 2 (temp=0.7)] ──► [Aggregator] ──► [Output]
           │                                │
           └──► [Generation 3 (temp=1.0)] ──┘
Shape Structure Primary Benefit Primary Cost
Sequential Chain Linear, one step feeds next Error isolation, transformation Added latency per step
Router Conditional branching at decision point Task-appropriate context per branch Classification overhead
Fan-Out/Fan-In Parallel subtasks, merged result Throughput, parallelism Merge complexity
Ensemble Multiple independent generations, aggregated Output quality, robustness N× token cost

🤔 Did you know? These four shapes map closely to patterns in distributed systems design — sequential chains resemble pipeline stages in stream processing, routers resemble message brokers with topic-based dispatch, fan-out/fan-in resembles MapReduce, and ensembles resemble redundant service calls with quorum voting. The underlying intuition — decompose, specialize, parallelize, validate — appears across systems design at many scales.

It would be tidy if real pipelines used exactly one of these shapes. They don't. A production pipeline for a complex agentic task typically contains a sequential chain at the top level, where one step is a router that dispatches to specialized sub-chains, and one of those sub-chains uses fan-out/fan-in to process multiple documents in parallel, with an ensemble at the aggregation step. Understanding what each shape does and when to reach for it is the prerequisite for designing pipelines that are robust at the composition level.


Sequential Pipelines and Routing

With the problem space established, we can work through the two foundational structural patterns in detail. The sequential pipeline — where steps run one after another in a fixed order — is the backbone on which everything else is built. The router — where an intermediate result determines which branch runs next — is the first branching pattern layered on top of it.

Sequential Pipelines: Step-by-Step Transformation

A sequential pipeline is an ordered list of processing steps where the output of step N becomes the input of step N+1. Each step receives some state, performs a focused transformation, and writes its result back to that state before passing it forward.

User Input
    │
    ▼
┌─────────────┐
│   Step 1    │  ← Extract entities from raw text
│  (Extract)  │
└──────┬──────┘
       │  structured entities
       ▼
┌─────────────┐
│   Step 2    │  ← Validate entities against schema
│ (Validate)  │
└──────┬──────┘
       │  validated payload
       ▼
┌─────────────┐
│   Step 3    │  ← Generate a user-facing summary
│ (Summarize) │
└──────┬──────┘
       │
       ▼
   Final Output

Each box is independently testable. If step 2's validator raises an error, you know exactly where the problem is without re-running the expensive extraction step.

🎯 Key Principle: Each step in a sequential pipeline should do exactly one thing and be verifiable on its own. Steps are not limited to LLM calls — a step is any callable that accepts the current pipeline state and returns an updated version. Steps commonly transform data, enrich it with retrieved context, validate it against constraints, or generate new content via an LLM call.

The separation matters because it keeps LLM prompts narrow. A prompt that does extraction and validation and summarization accumulates implicit requirements that are hard to unit-test and easy to break silently.

Implementing a Minimal Sequential Chain in Python

from typing import Any

def extract_intent(context: dict[str, Any]) -> dict[str, Any]:
    """Step 1: Parse the user's raw input into a structured intent."""
    raw_input = context["user_input"]
    context["intent"] = {
        "action": "lookup",
        "entity": raw_input.strip().lower(),
    }
    return context


def validate_intent(context: dict[str, Any]) -> dict[str, Any]:
    """Step 2: Confirm the extracted intent meets minimum quality criteria."""
    intent = context.get("intent", {})
    if not intent.get("entity"):
        raise ValueError("Extraction produced an empty entity — aborting pipeline.")
    context["intent_valid"] = True
    return context


def generate_response(context: dict[str, Any]) -> dict[str, Any]:
    """Step 3: Produce the final user-facing response."""
    entity = context["intent"]["entity"]
    context["response"] = f"Here is the information you requested about '{entity}'."
    return context


def run_pipeline(steps: list, initial_context: dict[str, Any]) -> dict[str, Any]:
    """Execute steps in order, threading context through each one."""
    context = initial_context.copy()
    for step in steps:
        context = step(context)
    return context


pipeline = [extract_intent, validate_intent, generate_response]
result = run_pipeline(
    steps=pipeline,
    initial_context={"user_input": "Python decorators"},
)
print(result["response"])
## → "Here is the information you requested about 'python decorators'."

This example is intentionally simplified — it omits async execution, error recovery, and typed state (covered in Passing State and Context Between Steps). The structural pattern is real: a list of callables, a shared context dict threaded through each one, and an explicit loop that makes the data flow visible.

⚠️ Common Mistake: Passing only the previous step's output to the next step rather than the full context object. It looks clean until step 3 needs something from step 1 — at which point you either break the interface or start smuggling data through step 2 for no reason. Thread the full context; let each step read only what it needs.

Routers: Conditional Dispatch Without Cluttered Prompts

A sequential pipeline works well when every input should travel the same path. Many real tasks don't fit that constraint. A customer support agent might need to handle billing questions, technical troubleshooting, and feature requests — and these three task types need different system prompts, different retrieved context, and different tools. Forcing them through a single generic prompt either bloats the prompt with conditionals or produces weaker results than a focused sub-prompt would.

A router solves this by inspecting an intermediate output — usually a classification — and selecting which downstream step or sub-chain to invoke next. The conditional logic lives in the router function, not scattered across individual prompts.

         User Input
             │
             ▼
     ┌──────────────┐
     │   Classifier  │  ← Single LLM call: "What category is this?"
     └──────┬───────┘
            │
   ┌────────┼────────┐
   │        │        │
   ▼        ▼        ▼
┌─────────┐ ┌───────┐ ┌──────────┐
│ Billing │ │  Tech │ │ Feature  │
│ Branch  │ │Branch │ │ Request  │
└────┬────┘ └───┬───┘ └────┬─────┘
     └──────────┴──────────┘
                │
                ▼
          Final Response

The classifier step is a cheap, fast, focused call: it only answers the question "which category does this input belong to?" Every downstream branch can then assume it is working with a pre-classified request, which lets each branch prompt stay narrow and precise.

The accuracy gains from routing are most visible when task types have genuinely different context requirements. A technical troubleshooting prompt that needs API documentation in its context window is actively hurt by including billing policies — and vice versa. Keeping irrelevant context out of the prompt isn't just about token cost; it reduces the chance that the model anchors on the wrong part of a long context.

Concrete Router Example: Three-Way Dispatch
from typing import Any, Callable
from enum import StrEnum

class RequestCategory(StrEnum):
    BILLING = "billing"
    TECHNICAL = "technical"
    FEATURE_REQUEST = "feature_request"


def classify_request(context: dict[str, Any]) -> dict[str, Any]:
    """
    Calls an LLM to classify the user request.
    Returns the category stored in context["category"].
    """
    user_input = context["user_input"]
    # Simulate an LLM classification call.
    if "invoice" in user_input or "charge" in user_input:
        context["category"] = RequestCategory.BILLING
    elif "error" in user_input or "crash" in user_input or "bug" in user_input:
        context["category"] = RequestCategory.TECHNICAL
    else:
        context["category"] = RequestCategory.FEATURE_REQUEST
    return context


def handle_billing(context: dict[str, Any]) -> dict[str, Any]:
    context["response"] = f"[Billing handler] Addressing: {context['user_input']}"
    return context

def handle_technical(context: dict[str, Any]) -> dict[str, Any]:
    context["response"] = f"[Technical handler] Addressing: {context['user_input']}"
    return context

def handle_feature_request(context: dict[str, Any]) -> dict[str, Any]:
    context["response"] = f"[Feature handler] Addressing: {context['user_input']}"
    return context


ROUTE_MAP: dict[RequestCategory, Callable] = {
    RequestCategory.BILLING: handle_billing,
    RequestCategory.TECHNICAL: handle_technical,
    RequestCategory.FEATURE_REQUEST: handle_feature_request,
}

def router_step(context: dict[str, Any]) -> dict[str, Any]:
    """
    Reads context['category'] and dispatches to the matching handler.
    Raises KeyError if the classifier produced an unexpected category.
    """
    category = context["category"]
    handler = ROUTE_MAP[category]  # Fails loudly on unknown categories
    return handler(context)


def run_pipeline(steps: list, initial_context: dict[str, Any]) -> dict[str, Any]:
    context = initial_context.copy()
    for step in steps:
        context = step(context)
    return context


pipeline = [classify_request, router_step]
for test_input in [
    "I was charged twice on my last invoice.",
    "The app crashes when I upload a file larger than 10MB.",
    "Could you add dark mode to the dashboard?",
]:
    result = run_pipeline(pipeline, {"user_input": test_input})
    print(result["response"])

## → [Billing handler] Addressing: I was charged twice on my last invoice.
## → [Technical handler] Addressing: The app crashes when I upload a file larger than 10MB.
## → [Feature handler] Addressing: Could you add dark mode to the dashboard?

Several design choices here are worth naming:

  • ROUTE_MAP is a plain dict keyed on RequestCategory values, not a chain of if/elif statements. Adding a fourth category means adding one entry to the map.
  • Using StrEnum for categories means typos become AttributeErrors at import time rather than silent KeyErrors at runtime.
  • The router raises on unknown categories. A router that silently falls through to a default handler hides classification failures. Surface "other" as a signal that the classifier needs improvement, not as a catch-all.

⚠️ Common Mistake: Using a router as a catch-all fallback mechanism. If your classifier produces a category called "other" and routes it to a generic handler, you've recreated the single-catch-all prompt problem inside your pipeline.

When to Choose Routing Over a Single Prompt

Routing adds a step — a classifier call that costs tokens and latency before any real work happens. That overhead is worth paying when two or more of these conditions hold:

Criterion Single Prompt Router
Task type One well-defined task Multiple structurally different tasks
Tools needed Same tools every time Different tools per task type
Context needed Same documents every time Different retrieved context per type
Constraints Single set of instructions Different safety or tone requirements
Prompt length Stays focused Ballooning with conditionals

The most concrete signal that you need a router is prompt length inflation. When you look at a "handle everything" prompt and see instructions like "if the user is asking about billing, do X, but if they're asking about a technical error, do Y" — you are writing a router inside a prompt. Externalizing that logic into a classifier step almost always produces better results because each downstream prompt can now fill its context window with only what's relevant.

One practical heuristic: if writing a single prompt requires more than one if-style conditional instruction in the system message, a router is worth building.

Composing Chains and Routers

Sequential pipelines and routers are not alternatives to each other — they compose. A router's branches are typically sequential chains. A sequential chain might include a routing step partway through.

  Input
    │
    ▼
[Normalize Input]          ← sequential step
    │
    ▼
[Classify Topic]           ← sequential step whose output drives the router
    │
    ├──► [Branch A: Steps 1→2→3]
    ├──► [Branch B: Steps 1→2]
    └──► [Branch C: Steps 1→2→3→4]
              │
              ▼
        [Format Output]    ← sequential step after branches rejoin
              │
              ▼
          Response

Keeping the unification step separate from the branches means you only maintain one output-formatting prompt rather than one per branch.

Fan-out/fan-in — where multiple branches run concurrently rather than as alternatives — is a distinct pattern covered in the sections that follow.


Ensemble Patterns: Aggregating Multiple Outputs

Sequential chains and routers handle the case where each input follows one path through the pipeline. Ensembles address a different problem: for high-stakes decisions — classifying a support ticket's severity, generating a diagnosis summary — a single draw from the model's probability distribution may be excellent or it may be an outlier, and you often have no easy way to tell which. The ensemble pattern runs several independent calls and combines their outputs into one result that is more reliable than any individual response.

What Makes a Call "Independent"

Independence is the load-bearing word in the ensemble definition. Two calls are independent if neither one's output is conditioned on the other's. In practice you achieve this by running them concurrently and by varying at least one of three axes:

  • Temperature — higher temperature increases output diversity; N calls at temperature=0 produce the same output and add no value.
  • Prompt variation — rephrase the instruction, change the persona, or alter the output format directive.
  • Model variation — mixing a fast small model with a larger one can reveal systematic errors one model makes that the other avoids.
User Request
     │
     ├──► [Call A: temp=0.7, prompt variant 1 ] ──► Output A
     │
     ├──► [Call B: temp=0.9, prompt variant 2 ] ──► Output B
     │
     └──► [Call C: temp=0.7, model variant   ] ──► Output C
                                                        │
                                            ┌───────────▼────────────┐
                                            │   Aggregation Layer     │
                                            │  (vote / judge / merge) │
                                            └───────────┬────────────┘
                                                        │
                                                  Final Output

Three Aggregation Strategies

Majority Vote

Majority vote is appropriate when the output space is discrete: classification labels, yes/no decisions, numeric rankings. Each call casts one vote, and the most common answer wins.

💡 Mental Model: Think of majority vote as a jury. Each juror delivers an independent verdict; the verdict with more than half the votes wins. No deliberation between calls.

For discrete outputs, aggregation requires no additional LLM call:

from collections import Counter
from typing import Sequence

def majority_vote(answers: Sequence[str]) -> str:
    """Return the most common answer. Ties broken by first occurrence."""
    if not answers:
        raise ValueError("No answers to aggregate")
    counts = Counter(answers)
    return counts.most_common(1)[0][0]

candidates = ["urgent", "routine", "urgent"]
print(majority_vote(candidates))  # "urgent"

The limitation worth naming: majority vote fails silently when all candidates are wrong in the same way — a systematic bias in the prompt will produce a confident wrong answer. Majority vote detects variance, not bias.

Best-of-N via a Judge Prompt

When outputs are free-form text — an explanation, a code snippet — there is no canonical space to vote over. Instead, use a judge prompt: a separate LLM call that receives all N candidates and returns the best one, evaluated against explicit criteria you provide.

Making the rubric explicit forces you to articulate what "best" means, which is independently valuable. Ask the judge to explain its choice in one sentence before returning the index — the explanation surfaces cases where all candidates are weak.

⚠️ Common Mistake: Using the same model (with the same system prompt) as both generator and judge. The judge tends to prefer outputs that mirror its own generation style. Where possible, use a different system prompt or a different model for the judge call.

Synthesis

Synthesis is the most powerful and most expensive strategy. Rather than selecting one candidate, the aggregator asks an LLM to merge complementary strengths across candidates into a single superior response.

You are given {N} candidate responses to the following question.
Question: {original_question}

Candidates:
[1] {candidate_1}
[2] {candidate_2}
[3] {candidate_3}

Write a single response that captures the strongest elements of all
candidates while correcting any errors or omissions.

Synthesis is a full additional LLM call, adding both cost and latency on top of the N concurrent calls. Reserve it for cases where completeness visibly matters to the end user.

Implementing a Concurrent Ensemble in Python

The N calls must run concurrently — not sequentially — or you lose one of the primary practical benefits of the pattern.

import asyncio
from dataclasses import dataclass
from typing import Awaitable, Callable, Sequence

@dataclass
class ScoredCandidate:
    content: str
    score: float

async def run_ensemble(
    prompt: str,
    generate: Callable[[str, float], Awaitable[str]],
    score: Callable[[str, str], Awaitable[float]],
    temperatures: Sequence[float] = (0.6, 0.8, 0.9),
) -> ScoredCandidate:
    """
    Run one completion per temperature concurrently, score each
    against the original prompt, and return the best.

    Args:
        prompt:       The user-facing task description.
        generate:     Async function (prompt, temperature) -> completion string.
        score:        Async function (prompt, candidate) -> float in [0, 1].
        temperatures: One call is launched per entry; vary for diversity.
    """
    # Fan out: all completions run concurrently
    completions: list[str] = await asyncio.gather(
        *[generate(prompt, t) for t in temperatures]
    )

    # Score each candidate concurrently
    scores: list[float] = await asyncio.gather(
        *[score(prompt, c) for c in completions]
    )

    candidates = [
        ScoredCandidate(content=c, score=s)
        for c, s in zip(completions, scores)
    ]
    return max(candidates, key=lambda x: x.score)

generate and score are injected as callables, keeping the ensemble logic independent of any specific SDK. Scoring also runs concurrently — if the scoring function is itself an LLM call, this halves the latency compared to scoring sequentially. temperatures is a sequence rather than a count, making it natural to pass (0.0, 0.7, 0.9) when you want one deterministic baseline alongside two diverse candidates.

For the majority-vote case:

async def ensemble_majority_vote(
    prompt: str,
    generate: Callable[[str, float], Awaitable[str]],
    temperatures: Sequence[float] = (0.7, 0.8, 0.9),
) -> str:
    """Concurrent fan-out + majority vote for discrete-output tasks."""
    completions: list[str] = await asyncio.gather(
        *[generate(prompt, t) for t in temperatures]
    )
    # Normalize before voting — without this, "Urgent" and "urgent" count as different votes
    normalized = [c.strip().lower() for c in completions]
    return majority_vote(normalized)

Cost and Latency Profile

The arithmetic of ensembles is simple and should inform every decision to use one:

Single call:     1 × (input tokens + output tokens) × price per token
Ensemble of N:   N × (input tokens + output tokens) × price per token
                 + aggregation overhead (judge call, if used)

Latency:
  Single call:   T_call
  Ensemble of N: max(T_call_1, ..., T_call_N)   ← concurrent
                 + T_aggregation                 ← sequential, if judge used

Because the N calls run concurrently, wall-clock latency for the fan-out phase is approximately equal to the slowest individual call, not N times a single call. You multiply cost linearly with N but do not multiply latency linearly.

Best For Extra LLM Calls Extra Latency
Majority Vote Discrete / classification None Near zero
Best-of-N Judge Free-form text, ranked quality +1 judge call +1 sequential call
Synthesis Comprehensive coverage needed +1 synthesis call +1 sequential call

🎯 Key Principle: Use an ensemble when the quality gain on your specific task, measured on a held-out evaluation set, exceeds the cost multiplier. "It feels more robust" is not a sufficient decision criterion. In practice, ensembles show the clearest return when a single call has a meaningful error rate, the task runs at high volume, and the cost of a wrong answer exceeds the cost of the extra calls.

Before adding an ensemble, spend the same token budget on a better single prompt. A well-structured prompt with explicit chain-of-thought, clear output constraints, and a few well-chosen examples will often match or exceed a naive ensemble of weaker prompts at lower cost. Ensembles are not a substitute for prompt quality; they are an amplifier applied after prompt quality has been maximized.

Relationship to Fan-Out/Fan-In

The ensemble is a specific use case of the fan-out/fan-in pattern. In general fan-out/fan-in, parallel branches process different subtasks — each branch handles a distinct piece of work. In an ensemble, parallel branches process the same task with variation — the goal is redundancy and aggregation rather than task decomposition. The mechanical implementation is identical: asyncio.gather fans out, an aggregator fans in. What differs is intent and therefore aggregation logic. The Fan-Out and Fan-In Workflows child lesson covers the broader parallel execution mechanics in detail.


Passing State and Context Between Steps

As a pipeline grows beyond two or three steps, the question of where data lives becomes as important as what each step does. Without a deliberate answer, state tends to scatter: one function returns a raw string, another expects a dict with specific keys, a third accumulates results in a module-level list. This section addresses the mechanics of keeping data consistent and accessible as it moves through a multi-step pipeline.

The Case for a Single Explicit State Object

Pipeline state is the totality of information that accumulates as a pipeline executes: the original input, the output of every completed step, metadata like timestamps and step names, and any tool results collected along the way. Passing individual variables between functions breaks down quickly.

The concrete mechanism is a typed state object: a dataclass or Pydantic model that defines every field the pipeline may ever populate. Pydantic models are particularly well suited here because they provide runtime type validation, a clean .model_dump_json() path to serialization, and first-class support for optional fields that start None and get populated as the pipeline progresses.

from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any

@dataclass
class StepRecord:
    """Captures the output and metadata for one completed pipeline step."""
    step_name: str
    completed_at: str          # ISO-8601 string — stays JSON-serializable
    output: Any
    token_count: int = 0

@dataclass
class PipelineState:
    """
    Single source of truth threaded through every pipeline step.
    Fields are populated incrementally; downstream steps treat earlier
    fields as read-only.
    """
    # ── Immutable input ──────────────────────────────────────────────
    user_input: str
    pipeline_id: str

    # ── Per-step outputs (populated as steps complete) ───────────────
    step_records: list[StepRecord] = field(default_factory=list)

    # ── Accumulated tool results ─────────────────────────────────────
    tool_results: list[dict[str, Any]] = field(default_factory=list)

    # ── Working fields written by individual steps ───────────────────
    research_summary: str | None = None
    draft_response:   str | None = None
    final_response:   str | None = None

🎯 Key Principle: A pipeline step should read exactly the fields it needs, write exactly the field it owns, and never touch anything else. If a step is tempted to modify another step's output, that's a signal to introduce an intermediate transformation step instead.

💡 Mental Model: Think of the state object as a lab notebook. The original hypothesis (user_input) never changes. Each experiment appends its findings. You wouldn't scribble over earlier entries; you append.

What to Include in Shared Context

A well-designed state object contains four categories:

  • Original user input — preserved verbatim in an immutable field. Later steps frequently need to refer back to exactly what the user asked.
  • Per-step outputs — each step appends a StepRecord rather than overwriting a single latest_output field. This matters for debugging, retry logic, and ensemble patterns where you need to compare outputs across branches.
  • Metadata — step name, completion timestamp, and token count at minimum. Store timestamps as ISO-8601 strings rather than datetime objects so the state remains JSON-serializable.
  • Accumulated tool results — raw results from external tool calls stored as plain dicts. Storing them separately from the step's interpretation lets you re-run the interpretation without re-incurring the tool call's latency or cost.

Context Window Budgeting

The most insidious failure mode in long pipelines is running out of context budget several steps in. A step that produces a 2,000-token summary, fed into the next step alongside a 1,500-token tool result, alongside the original 800-token query — the accumulation is fast and silent.

Context window budgeting means actively tracking how many tokens each step consumes and applying a compression strategy before accumulated context grows too large. Three practical strategies:

Strategy Best For Information Loss
Hard truncation (drop oldest) Recency matters (e.g. chat) Oldest content discarded — potentially important
LLM summarize earlier steps Complex research pipelines Low — model preserves key points
Structured field extraction Tool results / JSON outputs None for retained fields; rest discarded

Hard truncation includes only the last K tokens of prior context. Simple, but risks dropping crucial earlier information.

LLM summarization adds a lightweight summarization call when accumulated step outputs exceed a token threshold. The summary replaces the raw step records in the prompt — though the originals remain in step_records on the state object for auditability.

Structured field extraction retains only the fields downstream steps actually use. If step 2 extracted 40 fields from an API response but step 3 uses only 4, include only those 4 in step 3's prompt.

⚠️ Common Mistake: Assuming the model will gracefully handle being near its context limit. Quality degrades noticeably before the hard limit is reached. Treat roughly 80% of a model's nominal context limit as your effective ceiling — this is a rough guide, as the exact degradation point varies by model and task.

Serialization at Every Step Boundary

Production pipelines rarely stay in a single process. Steps run as async tasks; a slow step hands off via a message queue; a sub-pipeline runs in a separate service. At any of these boundaries, the state object must cross a serialization gap.

🎯 Key Principle: Design pipeline state to be JSON-serializable at every step boundary, even if you don't currently cross one. The cost of this discipline is negligible; the cost of retrofitting it when you add a queue or subprocess is high.

Concrete implications:

  • No datetime objects — store as ISO-8601 strings.
  • No custom class instances in tool_results — use plain dicts, lists, strings, numbers, booleans.
  • No unserializable enums — use str-based enums (class StepStatus(str, Enum)).
  • No bytes or file handles — store a reference (file path or storage URL) rather than content.
## ❌ Storing the raw SDK response object — not serializable
state.tool_results.append(response)

## ✅ Extracting to a plain dict — always serializable
state.tool_results.append({
    "source": "web_search",
    "query": search_query,
    "results": [
        {"title": r.title, "url": r.url, "snippet": r.snippet}
        for r in response.results
    ],
    "retrieved_at": datetime.now(timezone.utc).isoformat(),
})

Code Walkthrough: Threading State Through Three Steps

import asyncio
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any

@dataclass
class StepRecord:
    step_name: str
    completed_at: str
    output: Any
    token_count: int = 0

@dataclass
class PipelineState:
    user_input: str
    pipeline_id: str
    step_records: list[StepRecord] = field(default_factory=list)
    tool_results: list[dict[str, Any]] = field(default_factory=list)
    research_summary: str | None = None
    draft_response:   str | None = None
    final_response:   str | None = None

def record_step(state: PipelineState, step_name: str, output: Any, tokens: int = 0) -> None:
    state.step_records.append(StepRecord(
        step_name=step_name,
        completed_at=datetime.now(timezone.utc).isoformat(),
        output=output,
        token_count=tokens,
    ))

async def step_research(state: PipelineState) -> PipelineState:
    """
    Reads:  state.user_input
    Writes: state.research_summary, state.tool_results
    """
    simulated_search_result = {
        "source": "web_search",
        "query": state.user_input,
        "results": [{"title": "Overview of topic", "snippet": "Key facts..."}],
        "retrieved_at": datetime.now(timezone.utc).isoformat(),
    }
    state.tool_results.append(simulated_search_result)
    summary = f"Research summary for: '{state.user_input}'. Found 1 relevant source."
    state.research_summary = summary
    record_step(state, step_name="research", output=summary, tokens=120)
    return state

async def step_draft(state: PipelineState) -> PipelineState:
    """
    Reads:  state.user_input, state.research_summary
    Writes: state.draft_response
    """
    draft = (
        f"Based on the research, here is a draft answer to '{state.user_input}': "
        f"[draft content drawn from: {state.research_summary}]"
    )
    state.draft_response = draft
    record_step(state, step_name="draft", output=draft, tokens=340)
    return state

async def step_refine(state: PipelineState) -> PipelineState:
    """
    Reads:  state.draft_response
    Writes: state.final_response
    """
    final = f"[Refined] {state.draft_response}"
    state.final_response = final
    record_step(state, step_name="refine", output=final, tokens=280)
    return state

async def run_pipeline(user_input: str) -> PipelineState:
    state = PipelineState(
        user_input=user_input,
        pipeline_id=str(uuid.uuid4()),
    )
    state = await step_research(state)
    state = await step_draft(state)
    state = await step_refine(state)
    return state

async def main() -> None:
    state = await run_pipeline("What are the main approaches to context window management?")
    print(f"Pipeline ID : {state.pipeline_id}")
    print(f"Final answer: {state.final_response}")
    print(f"Steps completed: {[r.step_name for r in state.step_records]}")
    print(f"Total tokens used: {sum(r.token_count for r in state.step_records)}")

if __name__ == "__main__":
    asyncio.run(main())

Key design choices: each step returns the state object it received, making data flow explicit in run_pipeline. Steps read named fields, not positional arguments — if you later add a field to PipelineState, existing steps are unaffected. record_step is a shared helper ensuring the audit trail is consistent regardless of which step ran.

⚠️ Common Mistake: Letting a step reach into a field it doesn't own and modify it "just this once." This creates implicit coupling between steps and leads to race conditions when parallel branches are added. Write fields in exactly one place.

This discipline becomes critical when pipelines are retried after failure. If step_refine fails and you replay from step_draft, the state object should be fully recoverable from its serialized form. Any step that mutated a field it didn't own produces a state object inconsistent with the documented schema — and the retry behaves differently from the original run.

Choosing Between Dataclasses and Pydantic

dataclass Pydantic BaseModel
Runtime validation Not built-in On assignment (v2) or at construction
JSON serialization Manual (asdict + json.dumps) .model_dump_json() built-in
Schema export Manual .model_json_schema() built-in
Dependency stdlib Third-party (pydantic)

For pipelines that stay in-process and are iterated quickly, dataclass keeps dependencies minimal. For pipelines that cross service boundaries or expose their schema to other systems, Pydantic earns its dependency.

🧠 Mnemonic: S.O.A.P.Single state object, One field per step, Append don't overwrite, Plain serializable types.


Common Pitfalls in Pipeline Design

Building a multi-step pipeline is an exercise in optimism: you design each step to succeed, wire them together, and run tests that confirm the happy path. Then you deploy, and over the course of real workloads you discover that failure modes are not where you expected them. Errors hide between steps rather than inside them. State accumulates in ways prompts were never designed to handle. Retries re-execute side effects. The five pitfalls here each describe a class of failure that surfaces predictably once a pipeline moves beyond toy examples.

Pitfall 1: Error Propagation Without Validation Gates

Error propagation is what happens when a bad output at step N is passed, unchanged, as the input to step N+1. Because individual steps rarely fail loudly — they return something rather than throwing an exception — malformed or low-confidence outputs travel silently through the pipeline until the final answer is visibly wrong.

User document
     │
     ▼
┌─────────────┐
│  Step 1     │  ← Extracts structured data
│  (Extract)  │    Output: { revenue: null, ... }  ← malformed
└──────┬──────┘
       │  (no validation gate — bad output passes through)
       ▼
┌─────────────┐
│  Step 2     │  ← Summarizes extracted data
│  (Summarize)│    Output: summary omits revenue   ← corrupted
└──────┬──────┘
       │
       ▼
┌─────────────┐
│  Step 3     │  ← Generates recommendations
│  (Recommend)│    Output: wrong recommendations   ← silently wrong
└─────────────┘

The fix is a validation gate between steps: a lightweight check that asserts the output of step N meets the structural and semantic requirements of step N+1 before it is forwarded.

from pydantic import BaseModel, ValidationError

class ExtractedData(BaseModel):
    company_name: str
    revenue: float          # required; None would be a validation failure
    year: int

def validate_step_output(raw_output: dict) -> ExtractedData:
    """
    Validation gate between step 1 and step 2.
    Raises ValueError if the output is malformed,
    preventing the bad value from reaching downstream steps.
    """
    try:
        return ExtractedData(**raw_output)
    except ValidationError as exc:
        raise ValueError(
            f"Step 1 output failed validation — halting pipeline: {exc}"
        ) from exc

## In the pipeline runner:
raw = run_extraction_step(document)
validated = validate_step_output(raw)   # ← gate fires before step 2 runs
summary = run_summary_step(validated)

⚠️ Common Mistake: Treating LLM output as inherently trustworthy because it returned a 200 status code. A model can produce syntactically valid JSON that is semantically garbage. Validation gates must check content, not just structure.

Pitfall 2: Prompt Bleed

Prompt bleed occurs when the full accumulated context — every prior step's inputs and outputs, every tool result, the entire conversation history — is injected wholesale into each subsequent prompt. The practical result is that later steps receive thousands of tokens of material most of which is irrelevant, and the model's attention is divided across that noise.

❌ Prompt bleed pattern:

  Step 5 prompt receives:
  ┌───────────────────────────────────┐
  │ [Original user message]           │  ← needed
  │ [Step 1 full output - 400 tokens] │  ← irrelevant
  │ [Step 2 full output - 600 tokens] │  ← irrelevant
  │ [Step 3 full output - 300 tokens] │  ← needed
  │ [Step 4 full output - 500 tokens] │  ← irrelevant
  │ [Current task instruction]        │  ← needed
  └───────────────────────────────────┘
  Total: ~2000 tokens, ~900 of which are relevant

✅ Filtered context pattern:

  Step 5 prompt receives:
  ┌───────────────────────────────────┐
  │ [Original user message]           │  ← needed
  │ [Step 3 output - 300 tokens]      │  ← needed
  │ [Current task instruction]        │  ← needed
  └───────────────────────────────────┘
  Total: ~500 tokens, all relevant

The solution is context filtering: each step's prompt template explicitly declares which prior outputs it requires, and the pipeline runner assembles only those pieces.

from dataclasses import dataclass
from typing import Optional

@dataclass
class PipelineState:
    original_query: str
    extracted_data: Optional[dict] = None
    summary: Optional[str] = None
    recommendations: Optional[str] = None

def build_recommendation_prompt(state: PipelineState) -> str:
    """
    Only pulls the fields this step actually needs.
    Does NOT inject state.extracted_data or any other prior output.
    """
    return (
        f"User request: {state.original_query}\n\n"
        f"Summary of findings:\n{state.summary}\n\n"
        f"Generate three specific recommendations based on the summary above."
    )

💡 Pro Tip: If you find yourself writing str(state) or json.dumps(vars(state)) to build a prompt, that is a strong signal that prompt bleed is about to occur. Prompt construction should be a deliberate curation of relevant fields, not a serialization of the entire state object.

Pitfall 3: Mistaking Steps for Stateless Functions

Pipeline steps that make tool calls, write to databases, send emails, or charge payment methods are non-idempotent: running them twice causes two external actions rather than one. The problem surfaces on retries — when a step fails partway through and the pipeline runner retries the entire step.

Timeline of a non-idempotent retry failure:

Attempt 1:
  [LLM call] ──► success
  [Write order to DB] ──► success  (order_id: 42 created)
  [Return result] ──► network timeout ← failure here

Pipeline runner retries step:

Attempt 2:
  [LLM call] ──► success
  [Write order to DB] ──► success  (order_id: 43 created ← duplicate!)
  [Return result] ──► success

The fix requires two complementary approaches. First, idempotency keys: before executing a side effect, check whether a record with the current run ID and step ID already exists.

import uuid
from typing import Optional

def write_order_idempotent(db, run_id: str, step_id: str, order_data: dict) -> dict:
    """
    Checks for an existing record before writing.
    Safe to call multiple times with the same run_id + step_id.
    """
    existing = db.orders.find_one({"run_id": run_id, "step_id": step_id})
    if existing:
        return existing["order"]

    order = {**order_data, "order_id": str(uuid.uuid4())}
    db.orders.insert_one({"run_id": run_id, "step_id": step_id, "order": order})
    return order

Second, separate the LLM reasoning step from the side-effect execution step. A step that calls the model to decide what to write and then writes it combines two concerns. Splitting them makes each piece easier to make idempotent and easier to test independently.

Pitfall 4: Hardcoded Step Counts in Fan-Out Logic

A common implementation mistake is hardcoding the number of parallel branches: run_step_a(), run_step_b(), run_step_c(). This breaks in exactly the scenarios where fan-out is most useful, because those scenarios involve a dynamic number of tasks determined at runtime.

❌ Hardcoded fan-out (breaks when N ≠ 3):

  decompose_query()
       │
  ┌────┼────┐
  ▼    ▼    ▼
 sub1 sub2 sub3    ← what happens to sub4...sub8?

✅ Dynamic fan-out (handles any N):

  decompose_query()  →  [sub1, sub2, ..., subN]
          │
          ▼
  asyncio.gather(*[process(task) for task in tasks])
          │
          ▼
     [result1, result2, ..., resultN]
import asyncio
from typing import List

async def process_subtask(subtask: str) -> str:
    ...

async def run_dynamic_fanout(subtasks: List[str]) -> List[str]:
    """Handles any number of subtasks without modification."""
    results = await asyncio.gather(
        *[process_subtask(task) for task in subtasks]
    )
    return list(results)

async def pipeline(query: str) -> str:
    subtasks: List[str] = await decompose_query(query)
    results: List[str] = await run_dynamic_fanout(subtasks)
    return await synthesize(results)

Unbounded asyncio.gather over a large task list can exhaust rate limits. Use a semaphore to cap concurrent executions:

async def run_fanout_with_limit(
    subtasks: List[str],
    max_concurrent: int = 5
) -> List[str]:
    semaphore = asyncio.Semaphore(max_concurrent)

    async def bounded_process(task: str) -> str:
        async with semaphore:
            return await process_subtask(task)

    return list(await asyncio.gather(
        *[bounded_process(t) for t in subtasks]
    ))

💡 Mental Model: Treat the number of parallel branches as an input to the pipeline, not a constant baked into the code. If you cannot answer "what happens when this number changes to N+1" without modifying the pipeline logic, the fan-out is hardcoded.

Pitfall 5: Missing Observability

A pipeline that produces a wrong final answer presents a debugging challenge qualitatively different from a wrong answer in a single-call system. The wrong answer could have originated at any step, been amplified at another, and arrived at the output looking fluent and complete. Without a record of what happened at each step, debugging is archaeology.

Observability means capturing, for every step execution: the inputs passed to the step, the raw output returned, the latency of the LLM call, and the token usage. This quadruple is the minimum necessary to reconstruct what happened after the fact.

Per-step observability record:

┌──────────────────────────────────────────────┐
│ Run ID:      run_abc123                       │
│ Step:        extract_entities (step 2 of 5)  │
│ Timestamp:   2026-03-14T10:22:31Z            │
│ Latency:     1.4s                             │
│ Tokens in:   847                              │
│ Tokens out:  203                              │
│ Input:       { "document": "..." }            │
│ Output:      { "entities": [...] }            │
│ Status:      success                          │
└──────────────────────────────────────────────┘

🤔 Did you know? Token usage per step is not just a cost metric — it is a diagnostic signal. A step that consistently uses far fewer output tokens than expected may be truncating its response. A step using far more than expected may be emitting verbose reasoning that muddies downstream context. Patterns in token counts often reveal structural problems before the final output does.

import time
import logging
from typing import Callable, Any
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class StepTrace:
    run_id: str
    step_name: str
    inputs: Any
    outputs: Any
    latency_seconds: float
    tokens_in: int
    tokens_out: int
    status: str  # "success" or "error"
    error_message: str = ""

def traced_step(
    run_id: str,
    step_name: str,
    fn: Callable,
    inputs: Any,
    token_counter  # caller-supplied token counting utility
) -> Any:
    """
    Wraps any pipeline step with observability.
    Records inputs, outputs, latency, and tokens regardless
    of whether the step succeeds or fails.
    """
    start = time.monotonic()
    try:
        result = fn(inputs)
        trace = StepTrace(
            run_id=run_id, step_name=step_name, inputs=inputs, outputs=result,
            latency_seconds=time.monotonic() - start,
            tokens_in=token_counter.last_input, tokens_out=token_counter.last_output,
            status="success"
        )
        logger.info("step_trace", extra={"trace": trace.__dict__})
        return result
    except Exception as exc:
        trace = StepTrace(
            run_id=run_id, step_name=step_name, inputs=inputs, outputs=None,
            latency_seconds=time.monotonic() - start,
            tokens_in=token_counter.last_input, tokens_out=0,
            status="error", error_message=str(exc)
        )
        logger.error("step_trace", extra={"trace": trace.__dict__})
        raise

The trace is emitted regardless of whether the step succeeds or fails — a failure trace is often more valuable than a success trace.

⚠️ Common Mistake: Logging only the final pipeline output. This tells you the pipeline failed but not where or why. Log at every step boundary.

Connecting the Pitfalls

These five failure modes interact. A pipeline missing validation gates (pitfall 1) combined with prompt bleed (pitfall 2) means a bad output propagates and accumulates in later prompts. A pipeline without observability (pitfall 5) on top of non-idempotent steps (pitfall 3) means that on retry, you duplicate side effects with no record of which attempt produced which.

Pitfall Root Cause Fix
Error propagation No validation between steps Add schema + semantic validation gates
Prompt bleed Injecting full state into prompts Filter context per step by declared dependencies
Non-idempotent steps Side effects re-run on retry Idempotency keys; separate reasoning from action
Hardcoded fan-out Fixed branch count in code Iterate over dynamic task lists; cap with semaphore
Missing observability No per-step logging Trace wrapper: inputs, outputs, latency, tokens

The common thread across all five: pipeline steps are treated as if they operate in isolation, when in fact they form a system where the assumptions of each step depend on the guarantees of the step before it.


Key Takeaways and What Comes Next

You started this lesson without a shared vocabulary for describing how LLM-based pipelines are structured. You finish it with four named shapes — sequential chain, router, ensemble, and fan-out/fan-in — and a set of engineering constraints that apply to all of them.

The Four Shapes Are Composable Primitives

Real pipelines are almost always combinations of the four shapes, not instances of exactly one:

User input
    │
    ▼
┌─────────────────────────────┐
│  Router                     │  ← classifies intent
└──────┬──────────┬───────────┘
       │          │
  simple path  complex path
       │          │
       ▼          ▼
  single LLM   Fan-Out
  call         ├── sub-agent A
               ├── sub-agent B
               └── sub-agent C
                       │
                  Fan-In (ensemble)
                  majority vote
                       │
                       ▼
              Sequential cleanup step
                       │
                       ▼
                  Final output

This pipeline uses all four shapes. No single shape is sufficient on its own.

Shape When to use it Primary cost Composes well with
Sequential chain Each step depends on the previous output Latency accumulates per step Everything — it is the backbone of most pipelines
Router Different inputs need meaningfully different processing paths One extra classification call Sequential chains and fan-out as downstream branches
Ensemble Output quality variance is high; accuracy gain justifies N× token cost N× token spend; aggregation overhead Fan-out/fan-in (ensemble is a specific fan-in strategy)
Fan-out / fan-in Sub-tasks are independent and can run concurrently Coordination overhead; state serialization Router (to determine what to fan out) and ensemble (at fan-in)

💡 Mental Model: Think of the four shapes the way you think of control flow primitives — sequence, branch, loop, and parallel. The question is never "which shape does this pipeline use?" but "which shapes does this pipeline need, and where?"

The Non-Negotiables

Typed state objects, per-step validation, and structured logging are the floor below which correctness cannot be guaranteed for any pipeline that runs in a setting where correctness matters.

from dataclasses import dataclass, field
from typing import Any
import json

@dataclass
class PipelineState:
    """Single source of truth for the entire pipeline run."""
    run_id: str
    original_input: str
    route: str | None = None
    step_outputs: dict[str, Any] = field(default_factory=dict)
    final_output: str | None = None
    errors: list[str] = field(default_factory=list)

    def record_step(self, step_name: str, output: Any) -> None:
        self.step_outputs[step_name] = output

    def to_json(self) -> str:
        return json.dumps({
            "run_id": self.run_id,
            "original_input": self.original_input,
            "route": self.route,
            "step_outputs": self.step_outputs,
            "final_output": self.final_output,
            "errors": self.errors,
        })

⚠️ The critical point: The cost of skipping per-step validation is not a failed run. It is a successful-looking run that produces a wrong answer. A pipeline that errors loudly is far easier to debug than one that silently propagates a malformed intermediate output through four more steps.

Measurable Criteria, Not Intuition

One of the most common pipeline design failures is adding structural complexity — routing, ensembles, additional steps — because it feels more robust, without any measurement to confirm it actually is.

Wrong thinking: "Adding an ensemble here makes the output more reliable." ✅ Correct thinking: "Our benchmark shows the single-call path has a 14% error rate on ambiguous inputs. Running a 3-way ensemble with majority vote drops that to 6%. The latency increase is within our budget. We add the ensemble."

For routing decisions: does routing to a specialized sub-chain produce measurably better outputs than a single general-purpose prompt? Build an eval set, run both approaches, and measure.

For ensemble decisions: what is the accuracy gain per additional generation, and at what N does the marginal gain fall below the marginal cost? Run the ensemble at N=2, 3, and 5 on your eval set and plot the curve.

💡 Pro Tip: Build your eval set before you build your pipeline. The eval set forces you to operationalize what "correct" means for your task, which will clarify design decisions far more than any structural pattern can.

Where to Go Next

This lesson established the structural vocabulary and the engineering constraints. Two child lessons go deeper on the parts that deserve more than an introduction.

Prompt Chaining Patterns covers what this lesson intentionally left underspecified: how to design and sequence the individual prompts that live inside each step. The structural vocabulary you now have tells you where prompts go in a pipeline; the child lesson tells you how to write them so that outputs from one step are maximally useful as inputs to the next — including output format contracts between steps, decomposing complex reasoning into sequences of narrower prompts, and handling the failure mode where a prompt produces valid output that is structurally incompatible with the next step's expected input.

Fan-Out and Fan-In Workflows covers the parallel execution mechanics that this lesson described only at the level of asyncio.gather. If you are building pipelines where sub-tasks run concurrently — for ensemble generation, parallel document processing, or distributed sub-agent execution — that lesson covers what matters in production: dynamic fan-out where task count is determined at runtime, fan-in aggregation strategies beyond majority vote, error handling when a subset of parallel tasks fail, and state serialization requirements for pipelines that span process or machine boundaries.