You are viewing a preview of this lesson. Sign in to start learning
Back to Agentic AI as a Part of Software Development

Prompt Chaining & Parallel Workflows

Build multi-step agent pipelines using sequential chains, routers, fan-out/fan-in, and ensemble patterns.

Why Multi-Step Agent Pipelines Change Everything

If you've ever asked an AI to do something genuinely complex — summarize a 50-page report, debug a multi-file codebase, or draft a complete marketing campaign — you've probably felt the gap between what you hoped would happen and what actually did. The output was vague, missed key details, or confidently fabricated something plausible but wrong. Grab these free flashcards at the end of each section to lock in the core vocabulary as you go. The frustrating truth is that the problem usually isn't the model's raw capability. It's the approach: a single, monolithic prompt asking a language model to do too much in one shot. That's the exact problem multi-step agent pipelines were built to solve, and understanding why they work the way they do will change how you think about building AI systems entirely.

This lesson is about orchestration — the art and engineering of coordinating multiple AI calls, tools, and decision points into coherent, production-grade workflows. By the end of this section, you'll understand why the single-prompt approach breaks down at scale, how composing discrete steps creates systems that are actually debuggable, and what the landscape of pipeline patterns looks like before we dive into each one in depth.

The Illusion of the Magic Single Prompt

There's a seductive idea that the right prompt, perfectly crafted, can do anything. And for simple tasks — translating a sentence, writing a haiku, answering a factual question — that's often true. But real-world software development tasks rarely fit this mold. Consider what actually happens when you ask a model to "analyze this codebase, identify security vulnerabilities, suggest fixes, and write unit tests for each fix."

The model is being asked to do several cognitively distinct things simultaneously:

🔧 Comprehension — understand what the code does 🎯 Analysis — reason about security implications across many lines 📚 Prioritization — rank vulnerabilities by severity 🧠 Generation — write corrected code 🔒 Validation — write tests that actually cover the fix

Each of these is a separate cognitive task. Humans don't do them simultaneously — they work through them sequentially, often cycling back. When a model tries to do all five at once, it makes shortcuts. Context windows fill up with competing signals. Early reasoning errors propagate forward. The output looks plausible but contains subtle failures that are nearly impossible to trace back to their source.

🤔 Did you know? Research on chain-of-thought prompting shows that forcing a model to reason through intermediate steps before producing a final answer can improve accuracy on complex reasoning tasks by 20–40% compared to asking for the answer directly. Multi-step pipelines apply this insight at an architectural level.

This is the first big idea: complexity doesn't scale linearly with prompt length. As tasks get more complex, single-prompt reliability degrades non-linearly. Pipelines solve this by decomposing complexity into units small enough for reliable, verifiable execution.

What Changes When You Break Work Into Steps

Let's make this concrete. Here's a naive single-prompt approach to code review:

## ❌ The monolithic approach — everything in one shot
import openai

client = openai.OpenAI()

def monolithic_code_review(source_code: str) -> str:
    """Ask the model to do everything at once."""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "user",
                "content": f"""Review this code. Identify bugs, security issues,
                performance problems, and style violations. Suggest fixes for each.
                Write unit tests for the critical fixes. Summarize findings for
                a non-technical manager.
                
                Code:
                {source_code}"""
            }
        ]
    )
    return response.choices[0].message.content

This works for toy examples. In production, it fails in predictable ways: the model drops one of the requested analyses, conflates bug categories, writes tests that don't compile, or produces a summary that contradicts the detailed findings. Worse, when it fails, you have no idea which part failed or why.

Now compare this with the beginning of a pipeline approach:

## ✅ The pipeline approach — discrete, composable steps
import openai
from dataclasses import dataclass
from typing import List

client = openai.OpenAI()

@dataclass
class PipelineState:
    """Shared state passed between pipeline steps."""
    source_code: str
    bugs: List[str] = None
    security_issues: List[str] = None
    fixes: List[str] = None
    tests: str = None
    summary: str = None

def step_identify_bugs(state: PipelineState) -> PipelineState:
    """Step 1: Only find bugs. Nothing else."""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": "You are a bug detection specialist. List only functional bugs."
            },
            {
                "role": "user",
                "content": f"List all functional bugs in this code:\n\n{state.source_code}"
            }
        ]
    )
    state.bugs = parse_list(response.choices[0].message.content)
    return state

def step_identify_security_issues(state: PipelineState) -> PipelineState:
    """Step 2: Only analyze security. Receives focused context."""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": "You are a security auditor. Focus only on security vulnerabilities."
            },
            {
                "role": "user",
                "content": f"Identify security vulnerabilities:\n\n{state.source_code}"
            }
        ]
    )
    state.security_issues = parse_list(response.choices[0].message.content)
    return state

def run_pipeline(source_code: str) -> PipelineState:
    """Compose steps into a pipeline, threading state through each."""
    state = PipelineState(source_code=source_code)
    state = step_identify_bugs(state)
    state = step_identify_security_issues(state)
    # ... additional steps follow
    return state

def parse_list(text: str) -> List[str]:
    """Helper: parse newline-separated items from model output."""
    return [line.strip() for line in text.strip().split('\n') if line.strip()]

Notice what's different. Each step has a single responsibility. The system prompt constrains the model to one cognitive task. The output of each step is structured, verifiable, and passed explicitly to the next step. If step 2 produces bad results, you know exactly where to look. You can test each step independently. You can log intermediate states. You can swap out one step without touching the others.

This is the engineering principle of separation of concerns applied to AI systems. And it changes everything about how you build, test, and debug agents.

💡 Pro Tip: When designing pipeline steps, apply the "would a junior developer understand this job description?" test. If a step's purpose can't be described in one sentence, it's probably doing too much.

Reliability, Debuggability, and the Production Reality

Here's a distinction that separates hobbyist AI experiments from production systems: debuggability. In a monolithic prompt system, when something goes wrong, your debugging options are essentially: tweak the prompt and try again. You're operating a black box. There's no way to know if the failure happened during the analysis phase, the generation phase, or the summarization phase.

Pipelines transform AI systems from black boxes into observable workflows. Each step boundary is a natural inspection point. You can log inputs and outputs at every node. You can set up validation checks that catch malformed outputs before they corrupt downstream steps. You can replay a specific step with different parameters without re-running the entire pipeline.

🎯 Key Principle: Observability is not optional in production AI systems. A pipeline that can't be inspected, tested, and debugged in stages is a liability, not an asset. Structure your agent workflows so that every step boundary is a potential logging and validation point.

💡 Real-World Example: A team building an AI-powered document processing system initially used a single prompt to extract, classify, summarize, and route documents. Their error rate was around 12% and debugging each failure took hours of manual inspection. After refactoring into a 5-step pipeline, they reduced their error rate to 3% and cut debugging time by 80% — not because the model got smarter, but because they could now see exactly which step was failing and why.

Debuggability also enables a practice that's essentially impossible with monolithic prompts: step-level unit testing. You can create a test suite with known inputs and expected outputs for each individual step. This is the same practice that makes traditional software reliable, now applied to AI.

The Spectrum of Pipeline Complexity

Not every agentic task needs the same pipeline architecture. One of the most important skills you'll develop in this lesson is reading a problem and choosing the right level of complexity. Think of pipeline patterns on a spectrum:

SIMPLE                                                        COMPLEX
   │                                                              │
   ▼                                                              ▼
┌──────────┐   ┌──────────────┐   ┌─────────────┐   ┌─────────────────┐
│ Sequential│   │  Conditional │   │   Fan-Out /  │   │    Ensemble /   │
│  Chain   │──▶│   Routing    │──▶│   Fan-In    │──▶│  Multi-Agent   │
└──────────┘   └──────────────┘   └─────────────┘   └─────────────────┘
   A→B→C       if X then A        A→[B,C,D]→E        A vs B → best(A,B)
               else B             (parallel)          (voting/critique)

At the simplest end, a sequential chain is just a series of steps where output A feeds input B, which feeds input C. This is the pattern you already saw in the code review example. It's the right choice when your task has a clear linear progression and each step depends on all previous steps.

Routers add conditional branching — the pipeline examines intermediate state and decides which path to take next. Maybe a classification step determines that an incoming document is a legal contract versus a marketing email, and routes each type to a specialized processing branch. Routing is what makes pipelines adaptive rather than mechanical.

Fan-out / fan-in patterns are where multi-step pipelines start to look truly different from sequential code. In a fan-out, one step spawns multiple parallel branches that execute simultaneously. In a fan-in, those branches reconverge, and their outputs are aggregated. This is the pattern that makes AI pipelines genuinely faster than sequential approaches for tasks that can be parallelized — processing 10 document chunks simultaneously instead of one at a time.

Ensemble patterns take parallelism further: multiple agents or prompting strategies tackle the same task independently, and a final step selects, combines, or critiques their outputs. This is how you build systems that are more reliable than any single model call.

🧠 Mnemonic: Think SRFESequential, Router, Fan-out/in, Ensemble. Each letter adds a new dimension of workflow intelligence: order, decisions, parallelism, and redundancy.

The Core Patterns This Lesson Covers

This lesson and its child topics are structured to take you from the foundational vocabulary to hands-on implementation of all four pattern types. Here's the map:

📋 Quick Reference Card: Pipeline Patterns Overview

🔧 Pattern 📚 What It Does 🎯 Best For
🔗 Sequential Chain Steps execute one after another Linear transformations, progressive refinement
🔀 Router Conditional branching at runtime Multi-class tasks, confidence-based fallbacks
📡 Fan-Out / Fan-In Parallel execution, then aggregation Batch processing, speed optimization
🎭 Ensemble Multiple agents on the same task High-stakes accuracy, self-critique

This lesson (the one you're reading now) establishes the mental models and vocabulary. The subsequent sections build out the anatomy of pipeline nodes and state, dive deep into routing logic, walk through a complete realistic implementation, and catalog the failure modes you'll inevitably encounter. The child lessons on prompt chaining and fan-out/fan-in patterns then go deep on implementing each at scale.

Where Pipelines Fit in the Agentic AI Lifecycle

It's worth zooming out for a moment to understand how pipeline patterns fit into the broader picture of agentic AI development. Building an agent isn't a single-shot activity — it's an engineering lifecycle with distinct phases:

┌─────────────────────────────────────────────────────────────────┐
│               Agentic AI Development Lifecycle                  │
├────────────┬──────────────┬──────────────┬──────────────────────┤
│  1. Design │ 2. Prototype │ 3. Structure │  4. Harden & Scale   │
│            │              │              │                      │
│ Define     │ Single-shot  │ ◀── YOU ARE  │ Observability,       │
│ task scope │ prompts to   │    HERE ──▶  │ testing, routing,    │
│ & success  │ validate     │ Decompose    │ parallelism, error   │
│ criteria   │ feasibility  │ into         │ recovery             │
│            │              │ pipelines    │                      │
└────────────┴──────────────┴──────────────┴──────────────────────┘

Pipeline design happens at the critical junction between "proving something is possible" (prototype) and "making it work reliably at scale" (harden). Many developers make the mistake of staying in prototype mode too long — iterating endlessly on single prompts instead of making the structural leap to pipeline architecture. The result is systems that work in demos but collapse in production.

⚠️ Common Mistake:

Mistake 1: Treating pipeline architecture as "over-engineering" for simple tasks. ⚠️

The test isn't complexity of the prompt — it's whether you need reliability, observability, and the ability to update parts independently. If yes, pipelines are the right tool even if the task feels simple.

Mistake 2: Designing the pipeline before validating individual steps. ⚠️

Before wiring steps together, test each step in isolation with representative inputs. A pipeline built on unvalidated steps just propagates errors with extra infrastructure around them.

Pipelines are also the architectural foundation that makes other agentic capabilities practical. Tool use becomes tractable when you can structure a step that decides which tool to call, another step that calls it, and another that interprets the result. Memory and context management becomes manageable when you control exactly what state gets passed between steps. Human-in-the-loop review points become natural insertion points at step boundaries. Everything in the agentic AI stack gets better when you have an explicit, structured pipeline underneath it.

💡 Mental Model: Think of a pipeline as the skeleton of your agent. Individual AI calls are the muscles — they do the actual work — but without the skeleton providing structure, shape, and load-bearing capability, the muscles can't coordinate to do anything useful. The pipeline is what gives your agent its form.

❌ Wrong thinking: "I'll build it as a single prompt first and refactor into a pipeline if it gets complex."

✅ Correct thinking: "I'll design the pipeline structure first based on the task's natural decomposition, then implement and validate each step. Adding complexity is easier than restructuring a system that's already in production."

The Paradigm Shift

Here is the core reframe that everything else in this lesson builds on: building with AI is not prompt engineering, it's software engineering that happens to involve prompts. The practices that make traditional software reliable — modularity, testability, observability, separation of concerns — apply directly to AI systems. Multi-step pipelines are simply the mechanism that lets you apply those practices.

When you shift from single prompts to structured pipelines, several things change at once:

🧠 Reasoning quality improves — models perform better on focused, bounded tasks than sprawling ones 📚 Debugging becomes tractable — failures are localized to specific steps with observable inputs and outputs 🔧 Teams can collaborate — different engineers can own different steps, testing and improving them independently 🎯 Iteration gets faster — changing one step doesn't require re-validating the entire workflow from scratch 🔒 Reliability scales — you can add validation, retries, and fallbacks at each step boundary independently

This is why the engineering teams building the most capable production AI systems don't just think about models — they think about workflow architecture. The model is one component in a larger system. And the quality of that system is determined not just by what models you choose, but by how thoughtfully you orchestrate them.

The rest of this lesson is about giving you the specific tools, vocabulary, and patterns to build those systems well. Starting with the fundamental building blocks — nodes, edges, and state — in the next section.

Anatomy of an Agent Pipeline: Nodes, Edges, and State

Before you can build a multi-step agent system that reliably solves real problems, you need a mental model precise enough to reason about what can go wrong — and expressive enough to design what should go right. That mental model is the pipeline graph: a structured way of thinking about your agent's work as a collection of discrete processing steps, the connections between them, and the shared data that flows through the whole system.

This section establishes the foundational vocabulary and concepts you will use throughout this lesson and its sub-topics. By the end, you will be able to look at any agentic workflow — no matter how complex — and decompose it into its constituent primitives.

The Three Primitives: Nodes, Edges, and State

Every agent pipeline, from the simplest two-step chain to a sprawling multi-agent orchestration system, is built from exactly three primitives.

Nodes (also called steps) are the individual units of processing in your pipeline. A node is any discrete operation that takes some input, does something with it, and produces some output. A node might call an LLM, execute a web search, parse a document, validate data, call an external API, or apply a business rule. The key characteristic of a node is that it has a single, well-defined responsibility — it does one thing and does it clearly.

Edges (also called transitions) are the connections between nodes. An edge defines the relationship between two steps: specifically, that the output of one node should flow into the input of another. Edges are what give your pipeline its shape. A linear sequence of edges produces a chain. Edges that split from one node to many produce a fan-out. Edges that converge from many nodes into one produce a fan-in. Edges that select one of several possible next nodes based on a condition produce a router.

State is the shared data object that travels through your pipeline from start to finish. Every node reads from and writes to this shared state. State is what allows nodes to communicate with each other without being directly coupled — node A does not call node B; instead, node A updates the state, and node B reads from it.

┌─────────────────────────────────────────────────────────┐
│                    PIPELINE GRAPH                        │
│                                                         │
│   ┌────────┐    edge    ┌────────┐    edge   ┌────────┐│
│   │ Node A │──────────▶│ Node B │──────────▶│ Node C ││
│   └────────┘            └────────┘           └────────┘│
│        │                     │                    │     │
│        └─────────────────────┴────────────────────┘     │
│                              │                          │
│                    ┌─────────▼─────────┐               │
│                    │   Shared  State   │               │
│                    │  { key: value }   │               │
│                    └───────────────────┘               │
└─────────────────────────────────────────────────────────┘

💡 Mental Model: Think of a pipeline like an assembly line in a factory. Each workstation (node) performs a specific task on the product passing through it. The conveyor belt connecting workstations is the edge. The product itself — accumulating modifications as it travels — is the state. No workstation needs to know about the full factory layout; it only needs to know what to do with what arrives on the belt.

How Data Flows and Transforms Between Stages

Understanding data flow is the most important practical skill in pipeline design. As state passes through each node, it undergoes transformation: some fields get read, some fields get written, and some fields get overwritten. The cumulative effect of all these transformations across all nodes is what produces the pipeline's final output.

There are three common patterns for how a node interacts with state:

🔧 Read-only nodes consume data from state without modifying it. A validation node might check that required fields are present and throw an error if not, but it doesn't change what it reads.

🔧 Write nodes add new keys to state. A document-parsing node might read a raw document_text field and write a new parsed_sections field, leaving the original intact.

🔧 Transform nodes read existing data and overwrite it with a new value. A summarization node might read full_text and overwrite it with a condensed version, or write to a new summary key to preserve both.

🎯 Key Principle: Prefer writing new keys over overwriting existing ones. Preserving the original data in state makes your pipeline dramatically easier to debug — you can inspect every transformation at every stage by examining the final state object.

⚠️ Common Mistake: Mistake 1: Designing nodes that reach "sideways" into state fields they don't own. If your summarization node also modifies the metadata.source_url field, you have created a hidden dependency that will cause confusion when that field behaves unexpectedly. Each node should have a clear contract about which fields it reads and which it writes.

Synchronous vs. Asynchronous Step Execution

Pipelines can execute their nodes in two modes, and choosing the right one matters both for correctness and performance.

In synchronous execution, each node runs to completion before the next one begins. The pipeline moves through nodes one at a time, in order. This is the simplest model and the right default for sequential chains where each step depends on the output of the previous step.

In asynchronous execution, nodes can be initiated without waiting for previous nodes to complete. This is essential for parallel workflows — patterns like fan-out where multiple independent operations (web searches, sub-agent calls, document retrievals) can proceed simultaneously and their results collected afterward.

SYNCHRONOUS (sequential):
─────────────────────────
Time ──────────────────────────────────────────▶

[  Node A  ][  Node B  ][  Node C  ]


ASYNCHRONOUS (parallel):
────────────────────────
Time ──────────────────────────────────────────▶

[  Node A  ]
           [  Node B  ]
           [  Node C  ]
           [  Node D  ]
                      [ Fan-in / Aggregator ]

In practice, most real pipelines mix both modes. The sequential parts of the workflow (where each step's output feeds the next) run synchronously. The independent sub-tasks within a stage (where several LLM calls or tool calls can proceed in parallel) run asynchronously. The fan-in step that collects results then runs synchronously once all async work completes.

💡 Pro Tip: When in doubt, start synchronous. Async execution adds complexity around error handling, partial failures, and result ordering. Only introduce parallelism once you've confirmed that a stage's sub-tasks are truly independent and that latency is a real bottleneck.

🤔 Did you know? The Python asyncio library's gather() function is the workhorse of fan-out patterns. Calling await asyncio.gather(task_a(), task_b(), task_c()) launches all three coroutines concurrently and waits until all complete, returning their results as a list in the order they were passed — even if they finished in a different order.

Representing Pipeline State in Code

The state object is the backbone of your pipeline, so how you represent it in code has significant consequences for maintainability, debuggability, and correctness. You have three main options, each with trade-offs.

Option 1: Plain Dictionaries

The simplest approach is a plain Python dict. It's flexible, requires no setup, and every developer already knows how to use it.

## A plain dictionary as pipeline state
state = {
    "user_query": "Summarize the latest earnings report for ACME Corp.",
    "retrieved_documents": None,
    "summary": None,
    "metadata": {
        "pipeline_id": "run-001",
        "started_at": "2024-01-15T09:00:00Z"
    }
}

def retrieval_node(state: dict) -> dict:
    """Fetches documents and writes them into state."""
    # In a real system, this would call a vector DB or search API
    state["retrieved_documents"] = [
        "ACME Corp reported Q4 revenue of $2.1B, up 12% YoY...",
        "Operating margins expanded by 200bps driven by cost controls..."
    ]
    return state

def summarization_node(state: dict) -> dict:
    """Reads documents from state, writes a summary."""
    docs = state["retrieved_documents"]
    # In a real system, this would call an LLM
    combined = " ".join(docs)
    state["summary"] = f"Summary of {len(docs)} documents: {combined[:100]}..."
    return state

Dictionaries become unwieldy quickly. There is no type checking, no IDE autocomplete on state fields, and a typo in a key name ("retreived_documents" instead of "retrieved_documents") silently produces a None rather than an error.

Option 2: Dataclasses

Python dataclasses give you structure, type hints, and IDE support with minimal boilerplate. They strike a good balance for most pipeline implementations.

from dataclasses import dataclass, field
from typing import Optional, List

@dataclass
class PipelineState:
    """Typed state object for the document summarization pipeline."""
    user_query: str
    retrieved_documents: Optional[List[str]] = None
    summary: Optional[str] = None
    pipeline_id: str = field(default_factory=lambda: "run-001")
    error: Optional[str] = None  # Capture errors without crashing the pipeline

def retrieval_node(state: PipelineState) -> PipelineState:
    """Fetches documents; type-safe access to state fields."""
    # IDE will autocomplete state.retrieved_documents
    # Typos in field names raise AttributeError immediately
    state.retrieved_documents = [
        "ACME Corp reported Q4 revenue of $2.1B, up 12% YoY...",
        "Operating margins expanded by 200bps driven by cost controls..."
    ]
    return state

def summarization_node(state: PipelineState) -> PipelineState:
    """Reads typed fields; produces a typed summary."""
    if not state.retrieved_documents:
        state.error = "No documents retrieved; cannot summarize."
        return state
    combined = " ".join(state.retrieved_documents)
    state.summary = f"Summary: {combined[:100]}..."
    return state
Option 3: Pydantic Models (Typed Schemas)

For production systems, Pydantic models add runtime validation on top of the type hints — meaning the pipeline will raise a clear, descriptive error if a node writes a value of the wrong type, rather than silently passing invalid data downstream.

from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime

class PipelineState(BaseModel):
    """Pydantic model with runtime validation for pipeline state."""
    user_query: str
    retrieved_documents: Optional[List[str]] = None
    summary: Optional[str] = None
    confidence_score: Optional[float] = Field(None, ge=0.0, le=1.0)  # Must be 0-1
    started_at: datetime = Field(default_factory=datetime.utcnow)
    error: Optional[str] = None

    class Config:
        # Prevent setting arbitrary fields not defined in the schema
        extra = "forbid"

With Pydantic, if a node tries to write state.confidence_score = 1.5, the model raises a ValidationError immediately rather than silently storing a semantically invalid value that causes mysterious behavior ten nodes later.

📋 Quick Reference Card: State Representation Options

🔧 Plain Dict 📚 Dataclass 🔒 Pydantic Model
🎯 Type Safety ❌ None ✅ Static only ✅ Static + Runtime
🔧 IDE Support ❌ Poor ✅ Good ✅ Excellent
⚡ Performance ✅ Fastest ✅ Fast ⚠️ Small overhead
🛡️ Validation ❌ None ❌ None ✅ Built-in
📦 Serialization ✅ Native ⚠️ Manual .model_dump()
🎯 Best For Prototypes Mid-size pipelines Production systems

Building a Minimal Two-Step Pipeline from Scratch

Concepts solidify through construction. Let's build a complete, runnable two-step pipeline that demonstrates all three primitives — nodes, edges, and state — working together. This pipeline will accept a raw user question, retrieve simulated context documents, and then summarize them.

from dataclasses import dataclass, field
from typing import Optional, List, Callable
import time

## ─── 1. DEFINE STATE ──────────────────────────────────────────────────────────

@dataclass
class ResearchState:
    """Shared state passed through every node in the research pipeline."""
    query: str
    documents: Optional[List[str]] = None
    summary: Optional[str] = None
    execution_log: List[str] = field(default_factory=list)

## ─── 2. DEFINE NODES ──────────────────────────────────────────────────────────

def retrieval_node(state: ResearchState) -> ResearchState:
    """
    NODE 1: Simulates fetching documents relevant to the query.
    In production, this would call a vector database or search API.
    """
    state.execution_log.append(f"[retrieval_node] Processing query: '{state.query}'")
    
    # Simulated document retrieval (replace with real search in production)
    state.documents = [
        f"Document 1: Background context about '{state.query}' from source A.",
        f"Document 2: Detailed analysis of '{state.query}' from source B.",
        f"Document 3: Recent developments related to '{state.query}' from source C.",
    ]
    
    state.execution_log.append(
        f"[retrieval_node] Retrieved {len(state.documents)} documents."
    )
    return state


def summarization_node(state: ResearchState) -> ResearchState:
    """
    NODE 2: Synthesizes the retrieved documents into a summary.
    In production, this would call an LLM via an API.
    """
    state.execution_log.append("[summarization_node] Beginning synthesis.")
    
    if not state.documents:
        # Handle missing upstream data gracefully
        state.summary = "No documents available to summarize."
        state.execution_log.append("[summarization_node] WARNING: No documents found.")
        return state
    
    # Simulated synthesis (replace with LLM call in production)
    doc_count = len(state.documents)
    state.summary = (
        f"Based on {doc_count} sources, here is a summary regarding '{state.query}': "
        f"[Synthesis of {doc_count} documents would appear here from an LLM call.]"
    )
    
    state.execution_log.append("[summarization_node] Summary generated successfully.")
    return state

## ─── 3. DEFINE EDGES (the pipeline runner) ────────────────────────────────────

def run_pipeline(
    initial_state: ResearchState,
    nodes: List[Callable[[ResearchState], ResearchState]]
) -> ResearchState:
    """
    Executes nodes in sequence, passing state through each one.
    Each node in the list is an 'edge' connecting it to the next.
    """
    state = initial_state
    for node in nodes:
        start = time.perf_counter()
        state = node(state)          # Pass state in, receive updated state out
        elapsed = time.perf_counter() - start
        print(f"  ✓ {node.__name__} completed in {elapsed:.4f}s")
    return state

## ─── 4. RUN IT ────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    # Initialize state with just the user's query
    initial_state = ResearchState(query="transformer architecture in LLMs")
    
    # Define the pipeline as an ordered list of nodes (edges are implicit)
    pipeline = [retrieval_node, summarization_node]
    
    print("Running pipeline...")
    final_state = run_pipeline(initial_state, pipeline)
    
    print("\n─── FINAL STATE ───")
    print(f"Query:     {final_state.query}")
    print(f"Docs:      {len(final_state.documents)} retrieved")
    print(f"Summary:   {final_state.summary}")
    print("\n─── EXECUTION LOG ───")
    for entry in final_state.execution_log:
        print(f"  {entry}")

This example is deliberately simple, but every concept from this section is present. The ResearchState dataclass is the state primitive. retrieval_node and summarization_node are the node primitives. The ordered list [retrieval_node, summarization_node] passed to run_pipeline defines the edges — the implicit rule that retrieval feeds into summarization. The run_pipeline function is the orchestrator, the component responsible for traversing the graph.

💡 Real-World Example: LangGraph, LlamaIndex Workflows, and similar frameworks are essentially sophisticated versions of this exact pattern. They formalize nodes as graph vertices with defined input/output contracts, edges as explicit transitions (sometimes with conditional logic), and state as a typed schema that the framework validates at each step. Understanding this minimal implementation means you understand the core of those frameworks at a conceptual level.

The Orchestrator's Role

Notice that in the example above, the individual nodes have no knowledge of each other. retrieval_node does not call summarization_node. Neither node knows how many steps exist in the pipeline, whether other steps succeeded, or what will happen after it returns. This is intentional.

The orchestrator — the component that runs the pipeline — is what holds the graph structure. It decides the order of execution, handles failures, routes between branches in conditional pipelines, and manages the lifecycle of the state object. Keeping orchestration logic out of nodes is what makes nodes reusable and testable in isolation.

Wrong thinking: "My summarization node should check if retrieval succeeded and retry if not."

Correct thinking: "My summarization node should check if state.documents is populated and return an appropriate error state. The orchestrator decides whether to retry the retrieval node."

🧠 Mnemonic: N-E-SNodes do the work, Edges define the path, State carries the memory. If any piece of logic doesn't fit cleanly into one of these three roles, you've found a design problem worth fixing.

From Mental Model to Production Thinking

The graph model is not just a conceptual convenience — it has direct implications for how you approach every aspect of pipeline development.

🎯 Testability: Because each node is a pure function (or close to one) that takes state and returns state, you can unit test every node in complete isolation. Create a state object with exactly the fields your node cares about, call the node, and assert on the output state. No mocking required.

🎯 Observability: The execution log in the example above is a primitive form of pipeline tracing — recording which nodes ran, in what order, and what they produced. Production systems extend this pattern with structured logging, span tracing (OpenTelemetry), and dedicated LLM observability platforms like LangSmith or Arize Phoenix.

🎯 Resumability: Because state is an explicit, serializable object (especially with Pydantic), you can checkpoint it to a database after each node completes. If your pipeline fails at node 7 of 12, you can resume from the checkpoint rather than rerunning from scratch — a critical capability for long-running agentic workflows that make expensive LLM calls.

With these foundations in place — nodes as processing units, edges as transitions, state as shared memory, and the orchestrator as the conductor — you have everything you need to understand the more sophisticated patterns that follow: conditional routing, fan-out parallelism, fan-in aggregation, and multi-agent ensembles. Every one of those patterns is just a specific way of wiring together these three primitives.

Routing and Conditional Branching in Pipelines

In the previous section, we established that agent pipelines are graphs of nodes connected by data flow. But a graph without decision points is just a straight line — useful, but limited. What transforms a pipeline from a rigid assembly line into a genuinely intelligent system is its ability to route: to look at what it knows at a given moment and decide which path to take next. This section is about how that decision-making works, how to implement it reliably, and how to keep it from going wrong.

What Is a Router?

A router is a pipeline step whose primary job is not to transform data but to inspect the current state and select which step, branch, or sub-pipeline should execute next. Think of it as a traffic controller sitting at an intersection: it doesn't move the cars itself, but it decides which direction each one goes based on signals it reads in real time.

In code, a router is typically a function (or a lightweight LLM call) that takes the current pipeline state as input and returns a routing decision — often a string label, an integer index, or a structured object identifying the next node.

Incoming State
      │
      ▼
┌─────────────┐
│   ROUTER    │  ← inspects state, applies logic
└─────┬───────┘
      │
   decision
      │
  ┌───┴────────────────┐
  │                    │
  ▼                    ▼
[Branch A]         [Branch B]
Specialist Chain   Specialist Chain
  │                    │
  └─────────┬──────────┘
            ▼
      Merged Output

Routers sit at the connective tissue of a pipeline. They are what allow a single entry point to fan out into parallel specialist sub-pipelines, or to select one of several sequential chains depending on the nature of the input. Without routers, you either duplicate logic across separate pipelines or force every input through every step regardless of relevance — both of which are expensive and brittle.

🎯 Key Principle: A router doesn't do the work; it decides who does. Keeping routing logic separate from processing logic is one of the most important architectural decisions in agentic system design.

LLM-Based Routers vs. Deterministic Rule-Based Routers

There are two fundamentally different philosophies for implementing routing logic, and the choice between them has significant consequences for your system's reliability, cost, and maintainability.

Deterministic rule-based routers use explicit, human-authored logic: regular expressions, keyword matching, threshold comparisons, or structured business rules. If the input contains the word "refund," route to the billing chain. If the confidence score from the previous step is below 0.7, route to the human-review branch. These routers are fast, cheap, fully auditable, and behave identically given the same input every time.

LLM-based routers use a language model to classify or reason about the input and produce a routing decision. They are more flexible — capable of handling nuance, ambiguity, and cases that rule-based logic would miss — but they introduce latency, cost per call, and non-determinism. The same input might, rarely, produce a different routing decision on two separate calls.

📋 Quick Reference Card: Router Type Trade-offs

🔧 Rule-Based Router 🧠 LLM-Based Router
🎯 Accuracy on clear cases High High
🎯 Accuracy on ambiguous cases Low High
⚡ Latency Near-zero 200ms–2s
💰 Cost per call Free Token cost
🔒 Determinism Fully deterministic Probabilistic
🔧 Maintenance burden High (rules rot) Low (prompt tuning)
📚 Auditability Easy Requires logging

The practical recommendation is to use rule-based routing first, and reach for LLM-based routing only when rule complexity becomes unmanageable or when the classification task genuinely requires language understanding. Many production systems use a hybrid: a fast rule-based pre-filter handles the obvious cases ("this is clearly a billing question"), and an LLM classifier handles the remainder.

💡 Real-World Example: A customer support pipeline might use regex matching to immediately route messages containing an order ID to the order-lookup chain. Only messages that don't match any known pattern get passed to an LLM classifier that determines whether they're about billing, technical support, account management, or general inquiry.

Implementing a Classifier-Router in Python

Let's build a concrete example. We'll create a simple support ticket pipeline where an LLM-based router classifies incoming tickets and dispatches them to one of three specialist sub-chains: billing, technical support, or general inquiry.

First, the router itself:

import json
from openai import OpenAI
from typing import Literal

client = OpenAI()

## The categories our router can dispatch to
RouteLabel = Literal["billing", "technical", "general", "fallback"]

def classify_ticket(ticket_text: str, confidence_threshold: float = 0.75) -> RouteLabel:
    """
    LLM-based router: classifies a support ticket and returns a routing label.
    Falls back to 'fallback' if confidence is below the threshold.
    """
    system_prompt = """You are a support ticket classifier. 
    Classify the ticket into exactly one of: billing, technical, general.
    Respond with JSON only: {"label": "...", "confidence": 0.0}
    confidence should be a float between 0 and 1."""

    response = client.chat.completions.create(
        model="gpt-4o-mini",  # fast, cheap — good for routing
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": ticket_text}
        ],
        temperature=0  # deterministic output for routing
    )

    try:
        result = json.loads(response.choices[0].message.content)
        label = result.get("label", "fallback")
        confidence = float(result.get("confidence", 0.0))

        # If the model isn't confident enough, send to fallback
        if confidence < confidence_threshold:
            return "fallback"

        # Validate the label is one we recognize
        if label not in ("billing", "technical", "general"):
            return "fallback"

        return label

    except (json.JSONDecodeError, KeyError, ValueError):
        # Any parsing failure routes to fallback — never crash the pipeline
        return "fallback"

Notice two important design choices here. First, temperature=0 is set explicitly — you want routing decisions to be as deterministic as possible, and temperature zero dramatically reduces variance. Second, every failure mode — JSON parse errors, unrecognized labels, low confidence — routes to a "fallback" path rather than raising an exception. This is critical; we'll return to why shortly.

Now let's wire this router into a pipeline that dispatches to specialist handlers:

from dataclasses import dataclass
from typing import Callable

@dataclass
class PipelineState:
    """Shared state object that flows through the pipeline."""
    original_ticket: str
    route: str = ""
    response: str = ""
    escalated: bool = False

## --- Specialist sub-chains ---

def handle_billing(state: PipelineState) -> PipelineState:
    """Specialist chain for billing questions."""
    # In production, this might call a billing API, look up account data, etc.
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are a billing specialist. Be concise and empathetic."},
            {"role": "user", "content": state.original_ticket}
        ]
    )
    state.response = response.choices[0].message.content
    return state

def handle_technical(state: PipelineState) -> PipelineState:
    """Specialist chain for technical issues."""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are a technical support engineer. Provide step-by-step solutions."},
            {"role": "user", "content": state.original_ticket}
        ]
    )
    state.response = response.choices[0].message.content
    return state

def handle_general(state: PipelineState) -> PipelineState:
    """Specialist chain for general inquiries."""
    response = client.chat.completions.create(
        model="gpt-4o-mini",  # cheaper model for simpler queries
        messages=[
            {"role": "system", "content": "You are a helpful support agent."},
            {"role": "user", "content": state.original_ticket}
        ]
    )
    state.response = response.choices[0].message.content
    return state

def handle_fallback(state: PipelineState) -> PipelineState:
    """Fallback handler: escalate to human review."""
    state.response = "Your request has been escalated to our support team."
    state.escalated = True
    return state

## --- The router + dispatcher ---

ROUTE_MAP: dict[str, Callable[[PipelineState], PipelineState]] = {
    "billing": handle_billing,
    "technical": handle_technical,
    "general": handle_general,
    "fallback": handle_fallback,
}

def run_support_pipeline(ticket_text: str) -> PipelineState:
    """Run the full routing pipeline for a support ticket."""
    state = PipelineState(original_ticket=ticket_text)

    # Step 1: Route
    state.route = classify_ticket(ticket_text)
    print(f"Routed to: {state.route}")

    # Step 2: Dispatch to specialist handler
    handler = ROUTE_MAP.get(state.route, handle_fallback)
    state = handler(state)

    return state

## Example usage
result = run_support_pipeline("I was charged twice for my subscription this month.")
print(result.response)

This structure — a ROUTE_MAP dictionary that maps string labels to handler functions — is clean and extensible. Adding a new routing category requires adding one entry to the dictionary and one handler function. The router and the dispatcher are decoupled from each other, which makes testing straightforward: you can test the classifier separately from the handlers.

💡 Pro Tip: When building LLM-based routers, always constrain the model's output to a known vocabulary. Ask for JSON with an explicit enum of valid labels. Unconstrained output from a router is a common source of silent failures — the model returns a label your dispatcher doesn't recognize, and suddenly nothing works.

Guarding Against Routing Loops and Unresolvable Branches

Routing introduces a category of bugs that sequential pipelines simply don't have: the possibility of loops. Imagine a pipeline where a router sends ambiguous inputs to a clarification step, and that clarification step feeds back into the same router. If the clarification doesn't actually resolve the ambiguity, you have an infinite loop — the pipeline spins forever, consuming tokens and money at each iteration.

⚠️ Common Mistake — Mistake 1: Loops Without Termination Conditions

Building a feedback loop (router → clarification → router) without a maximum iteration count is one of the most common production failures in agentic systems. It can exhaust rate limits, run up API costs, and time out user-facing requests.

The fix is always to carry a hop count in your pipeline state and enforce a hard ceiling:

@dataclass
class PipelineState:
    original_ticket: str
    route: str = ""
    response: str = ""
    escalated: bool = False
    hop_count: int = 0          # tracks how many routing decisions have been made
    MAX_HOPS: int = 5           # hard ceiling — never route more than this many times

def route_with_guard(state: PipelineState) -> PipelineState:
    """
    Router wrapper that enforces the hop ceiling.
    If the pipeline has routed too many times, escalate immediately.
    """
    state.hop_count += 1

    if state.hop_count > state.MAX_HOPS:
        print(f"WARNING: hop ceiling exceeded after {state.hop_count} routing decisions")
        state.route = "fallback"
        state.escalated = True
        return state

    state.route = classify_ticket(state.original_ticket)
    return state

The hop count pattern generalizes beyond routing loops. Any time a pipeline can revisit a decision node, you need a counter and a ceiling.

Unresolvable branches are a related problem: a branch that is reachable by the router but that has no defined handler. This is the routing equivalent of a null pointer dereference. The .get(state.route, handle_fallback) pattern in the previous code example is the simplest defense — always provide a default handler that the get call can return.

🎯 Key Principle: Every router must have a fallback path that is guaranteed to terminate. A pipeline that can get into a state from which it cannot exit is not a production-grade system.

Hybrid Routing: Combining Rules and LLMs

In practice, the most robust routing architectures layer rule-based and LLM-based logic rather than choosing one exclusively. The pattern works as follows:

Incoming Input
      │
      ▼
┌──────────────────────┐
│  Rule-Based Pre-Filter│  ← fast, free, handles high-confidence cases
│  (regex, keywords,   │
│   exact matches)     │
└──────┬───────────────┘
       │
  ┌────┴──────────────┐
  │                   │
  ▼                   ▼
[Matched]         [Unmatched]
Route directly    │
to handler        ▼
              ┌──────────────┐
              │ LLM Classifier│  ← slower, costs tokens, handles ambiguity
              └──────┬───────┘
                     │
               [label + confidence]
                     │
              ┌──────┴──────────┐
              │                 │
              ▼                 ▼
          [Confident]       [Uncertain]
          Route to          Route to
          specialist        fallback

This cascade approach means the LLM is only invoked when rule-based logic genuinely can't make the call. In a high-volume system, this can reduce LLM routing costs by 70–90% while maintaining classification quality for complex cases.

Routers as Connective Tissue Between Sub-Pipelines

Zooming out from implementation details, it's worth appreciating the architectural role routers play in larger systems. A router is not just a switch between two handlers — it is the mechanism that allows you to compose different pipeline topologies dynamically.

Consider a document processing pipeline. A router might:

  • Send short documents to a single-pass summarization chain (sequential)
  • Send long documents to a chunking pipeline that processes sections in parallel (fan-out)
  • Send documents in unknown languages to a translation step before re-entering the main pipeline
  • Send corrupted or unreadable documents to a human review queue

The router is what allows the same entry point to lead to radically different execution graphs depending on runtime conditions. This is why the sections that follow — covering fan-out/fan-in parallel execution and ensemble patterns — build directly on routing as a prerequisite: you cannot effectively parallelize work without a mechanism to decide which work deserves parallelization in the first place.

🤔 Did you know? The router pattern in agent pipelines is directly analogous to the strategy pattern in classical software design — a component that selects an algorithm (or sub-pipeline) from a family of alternatives at runtime. The key difference is that in agentic systems, the strategy selector itself can be an AI model, which is a capability classical OOP patterns never anticipated.

💡 Mental Model: Think of your pipeline as a city and your routers as the traffic signals. Sequential chains are one-way streets. Parallel sub-pipelines are multi-lane expressways. Routers are the intersections where the city's topology gets decided in real time. A city without well-designed intersections is gridlock; a pipeline without well-designed routers is a monolith.

Routing and Business Rules: A Note on Governance

One dimension of routing that is easy to overlook is governance: the idea that some routing decisions should not be left to an LLM even when an LLM would do a good job. In regulated industries — finance, healthcare, legal — there may be compliance requirements that certain types of input must be handled by human agents, or that certain data must not be processed by a specific sub-pipeline (for example, one that logs to an external service).

Rule-based routers excel here precisely because they are auditable and deterministic. If your compliance team needs to certify that "no patient data will ever be routed to the general-purpose LLM chain," you need a router whose logic can be reviewed by a non-technical auditor — not a black-box classifier.

Wrong thinking: "Our LLM router is 98% accurate, so it's fine for compliance-sensitive routing."

Correct thinking: "Compliance-sensitive routing decisions must be deterministic and auditable by default. LLM routers are reserved for cases where no business rule can be articulated clearly."

🧠 Mnemonic: SAFE routingStatements of rules for compliance, AI classifiers for ambiguity, Fallbacks for everything else, Explicit hop limits always.

Putting It Together

Routers transform linear pipelines into decision trees, and decision trees into dynamic graphs that can adapt to the actual nature of each input at runtime. The key design principles bear repeating:

🔧 Implementation checklist for any router:

  • Every router has at least one fallback path that always terminates
  • Pipeline state carries a hop count; routing logic enforces a ceiling
  • LLM-based routers use temperature=0 and constrained output formats
  • Rule-based pre-filters run before LLM classifiers wherever possible
  • Compliance-sensitive decisions are always rule-based, never delegated to a model
  • Route labels are validated against a known set before dispatch

With routing established as a first-class concept, the next section will show these patterns in action inside a complete end-to-end pipeline — one that combines sequential processing, routing, and an initial look at parallel execution into a single coherent system you can adapt for your own projects.

Practical Pipeline Patterns: Building a Multi-Stage Processing System

Theory is only as useful as the intuition it builds when you sit down to write real code. The previous sections established the vocabulary — nodes, edges, state, routers — and showed how pipelines make branching decisions at runtime. Now it is time to bring those ideas together into a single, cohesive example you can study, extend, and adapt. By the end of this section you will have a concrete reference implementation of a document analysis pipeline that ingests raw text, classifies it, enriches it with additional context, and synthesizes a structured report. Along the way, you will see exactly how to structure Python code for clarity and testability, how to pass state between stages without losing traceability, and how to add the lightweight observability that separates a pipeline you can debug from one you can only guess at.

Designing the Pipeline: Four Stages, One Goal

Before writing a single line of Python, invest a few minutes in design. A document analysis pipeline can be decomposed into four natural stages that mirror how a skilled analyst actually works.

Raw Document
     │
     ▼
┌─────────────┐
│  INGESTION  │  Parse, clean, chunk
└──────┬──────┘
       │  DocumentContext
       ▼
┌──────────────────┐
│  CLASSIFICATION  │  Type, topic, urgency
└────────┬─────────┘
         │  DocumentContext + ClassificationResult
         ▼
┌───────────────┐      ┌──────────────────────┐
│    ROUTER     │─────▶│  ENRICHMENT (branch) │
└───────────────┘      │  - Legal enricher    │
  (selects branch)     │  - Financial enricher│
                       │  - General enricher  │
                       └──────────┬───────────┘
                                  │
                                  ▼
                         ┌─────────────────┐
                         │   SYNTHESIS     │  Final report
                         └─────────────────┘

Ingestion handles everything that needs to happen before the document is useful: stripping HTML, normalizing whitespace, splitting long documents into manageable chunks, and attaching metadata such as source URL, timestamp, and document ID. Classification sends the cleaned text to an LLM (or a lighter-weight classifier) and gets back a document type, a topic category, and a confidence score. Enrichment is where routing pays off — a legal document needs different additional context than a financial report or a customer support ticket. Synthesis takes the enriched document and produces the final artifact: a structured summary, a risk assessment, or an action list depending on what the pipeline is configured to produce.

🎯 Key Principle: Each stage should have exactly one responsibility. When a stage starts doing two different conceptual jobs, it is a signal to split it. Single-responsibility stages are independently testable, swappable, and reusable across pipelines.

The Context Object: Your Pipeline's Source of Truth

The most important design decision in any pipeline is how data travels between stages. The naive approach — returning raw strings or dictionaries — creates invisible coupling between stages and makes debugging a guessing game. The professional approach introduces a structured context object that every stage reads from and writes to.

A context object is not just a data container. It is a living audit log of what the pipeline has done. Every stage stamps its output into the context alongside a timestamp and status flag. When something goes wrong at stage three, you can inspect the context and see exactly what stages one and two produced.

import time
import uuid
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class PipelineContext:
    """Carries state through the entire document analysis pipeline."""

    # ── Identity ────────────────────────────────────────────────────────────
    run_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    source: str = ""                    # e.g. "s3://bucket/file.pdf"

    # ── Stage outputs (populated as the pipeline runs) ───────────────────────
    raw_text: str = ""
    cleaned_text: str = ""
    chunks: list[str] = field(default_factory=list)

    doc_type: str = ""                  # "legal" | "financial" | "general"
    topic: str = ""
    classification_confidence: float = 0.0

    enrichment_data: dict = field(default_factory=dict)
    final_report: str = ""

    # ── Observability ────────────────────────────────────────────────────────
    stage_timings: dict[str, float] = field(default_factory=dict)
    stage_errors: dict[str, str] = field(default_factory=dict)
    completed_stages: list[str] = field(default_factory=list)

    def mark_stage_complete(self, stage_name: str, elapsed: float) -> None:
        self.completed_stages.append(stage_name)
        self.stage_timings[stage_name] = round(elapsed, 3)

    def mark_stage_failed(self, stage_name: str, error: str) -> None:
        self.stage_errors[stage_name] = error

    @property
    def succeeded(self) -> bool:
        return len(self.stage_errors) == 0

Notice a few deliberate choices. The run_id field gives every pipeline execution a unique identity, which is invaluable when logs from hundreds of concurrent runs land in the same log stream. The stage_timings dictionary makes performance regressions visible at a glance. The completed_stages list lets downstream stages verify that their prerequisites actually ran — a simple guard against pipeline misconfiguration.

💡 Pro Tip: Define your context object before you write any stage code. The shape of the context is the contract between stages. Getting it right first saves you from cascading refactors later.

Structuring the Stages: Functions, Classes, and the Decorator Pattern

Python gives you several ways to structure pipeline stages. The simplest is a plain function that accepts a PipelineContext, mutates it, and returns it. This is perfectly fine for small pipelines. For larger systems, wrapping each stage in a class lets you inject configuration (API keys, model names, chunking parameters) without threading them through every function signature.

The example below shows the ingestion and classification stages. Each stage follows the same pattern: record a start time, do the work, write results into the context, call mark_stage_complete, and return the context. This uniformity means you can build a generic pipeline runner that calls any stage in the same way.

import logging
import re
from openai import OpenAI  # or any LLM client of your choice

logger = logging.getLogger(__name__)
client = OpenAI()  # reads OPENAI_API_KEY from environment


## ── Stage 1: Ingestion ────────────────────────────────────────────────────────

def run_ingestion(ctx: PipelineContext, raw_text: str) -> PipelineContext:
    """Cleans raw text and splits it into chunks."""
    t0 = time.perf_counter()
    logger.info("[%s] ingestion: starting", ctx.run_id)

    try:
        ctx.raw_text = raw_text

        # Strip HTML tags and normalize whitespace
        no_html = re.sub(r"<[^>]+>", " ", raw_text)
        ctx.cleaned_text = " ".join(no_html.split())

        # Chunk into ~500-word segments (simple word-count strategy)
        words = ctx.cleaned_text.split()
        chunk_size = 500
        ctx.chunks = [
            " ".join(words[i : i + chunk_size])
            for i in range(0, len(words), chunk_size)
        ]

        logger.info(
            "[%s] ingestion: produced %d chunk(s)", ctx.run_id, len(ctx.chunks)
        )
        ctx.mark_stage_complete("ingestion", time.perf_counter() - t0)

    except Exception as exc:
        ctx.mark_stage_failed("ingestion", str(exc))
        logger.error("[%s] ingestion failed: %s", ctx.run_id, exc)
        raise  # let the runner decide whether to abort or continue

    return ctx


## ── Stage 2: Classification ───────────────────────────────────────────────────

CLASSIFICATION_PROMPT = """
You are a document classifier. Analyze the text and respond with JSON only.
Fields required:
  doc_type  : one of ["legal", "financial", "general"]
  topic     : a short descriptive phrase (max 8 words)
  confidence: a float between 0.0 and 1.0

Document (first 1000 chars):
{sample}
"""


def run_classification(ctx: PipelineContext) -> PipelineContext:
    """Calls an LLM to classify document type, topic, and confidence."""
    t0 = time.perf_counter()
    logger.info("[%s] classification: starting", ctx.run_id)

    assert "ingestion" in ctx.completed_stages, "Ingestion must run before classification"

    try:
        sample = ctx.cleaned_text[:1000]
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "Respond with valid JSON only."},
                {"role": "user", "content": CLASSIFICATION_PROMPT.format(sample=sample)},
            ],
            temperature=0,
            response_format={"type": "json_object"},
        )

        import json
        result = json.loads(response.choices[0].message.content)

        ctx.doc_type = result["doc_type"]
        ctx.topic = result["topic"]
        ctx.classification_confidence = float(result["confidence"])

        logger.info(
            "[%s] classification: type=%s confidence=%.2f",
            ctx.run_id, ctx.doc_type, ctx.classification_confidence,
        )
        ctx.mark_stage_complete("classification", time.perf_counter() - t0)

    except Exception as exc:
        ctx.mark_stage_failed("classification", str(exc))
        logger.error("[%s] classification failed: %s", ctx.run_id, exc)
        raise

    return ctx

Several practices here are worth calling out explicitly. The assert guard at the top of run_classification catches pipeline misconfiguration during development — if you accidentally wire stages in the wrong order, you get an immediate, descriptive error instead of a cryptic AttributeError. The temperature=0 setting on the classification call is intentional: classification is a deterministic task, and temperature introduces unwanted variance. Using response_format={"type": "json_object"} forces the model to return valid JSON, eliminating an entire class of parse failures.

⚠️ Common Mistake: Mistake 1 — Embedding the pipeline runner logic inside individual stage functions. When run_classification decides whether to call run_enrichment, you have lost the separation between what a stage does and how the pipeline is orchestrated. Keep stages pure: they receive a context, do their job, and return the context. Let a separate runner decide what runs next.

The Router and Enrichment Branch

With classification complete, the pipeline has enough information to route the document to the right enrichment stage. The router is a thin function — just a conditional — but its placement as an explicit, named step makes the pipeline's branching logic visible and testable.

## ── Router ────────────────────────────────────────────────────────────────────

LOW_CONFIDENCE_THRESHOLD = 0.65


def select_enricher(ctx: PipelineContext):
    """Returns the appropriate enrichment function based on classification."""
    assert "classification" in ctx.completed_stages

    if ctx.classification_confidence < LOW_CONFIDENCE_THRESHOLD:
        logger.warning(
            "[%s] router: low confidence (%.2f), routing to general enricher",
            ctx.run_id, ctx.classification_confidence,
        )
        return enrich_general

    routing_table = {
        "legal":     enrich_legal,
        "financial": enrich_financial,
        "general":   enrich_general,
    }
    enricher = routing_table.get(ctx.doc_type, enrich_general)
    logger.info("[%s] router: selected %s", ctx.run_id, enricher.__name__)
    return enricher


## ── Enrichment stubs (each would call specialized LLM prompts in production) ──

def enrich_legal(ctx: PipelineContext) -> PipelineContext:
    t0 = time.perf_counter()
    # In production: extract parties, jurisdiction, key clauses, risk flags
    ctx.enrichment_data = {
        "parties": ["Acme Corp", "Beta LLC"],
        "jurisdiction": "New York",
        "key_clauses": ["Indemnification", "Limitation of Liability"],
    }
    ctx.mark_stage_complete("enrichment", time.perf_counter() - t0)
    return ctx


def enrich_financial(ctx: PipelineContext) -> PipelineContext:
    t0 = time.perf_counter()
    # In production: extract figures, ratios, trends, risk indicators
    ctx.enrichment_data = {
        "revenue": "$4.2M",
        "yoy_growth": "+12%",
        "risk_flags": ["High debt-to-equity"],
    }
    ctx.mark_stage_complete("enrichment", time.perf_counter() - t0)
    return ctx


def enrich_general(ctx: PipelineContext) -> PipelineContext:
    t0 = time.perf_counter()
    ctx.enrichment_data = {"summary_points": ["No specialized enrichment applied."]}
    ctx.mark_stage_complete("enrichment", time.perf_counter() - t0)
    return ctx


## ── Stage 4: Synthesis ────────────────────────────────────────────────────────

def run_synthesis(ctx: PipelineContext) -> PipelineContext:
    """Combines classification and enrichment data into a final report."""
    t0 = time.perf_counter()
    logger.info("[%s] synthesis: starting", ctx.run_id)
    assert "enrichment" in ctx.completed_stages

    import json
    ctx.final_report = (
        f"# Document Analysis Report\n"
        f"**Run ID:** {ctx.run_id}\n"
        f"**Type:** {ctx.doc_type} | **Topic:** {ctx.topic}\n"
        f"**Confidence:** {ctx.classification_confidence:.0%}\n\n"
        f"## Enrichment Data\n```json\n"
        f"{json.dumps(ctx.enrichment_data, indent=2)}\n```\n"
    )
    ctx.mark_stage_complete("synthesis", time.perf_counter() - t0)
    return ctx


## ── Pipeline Runner ───────────────────────────────────────────────────────────

def run_pipeline(raw_text: str, source: str = "unknown") -> PipelineContext:
    """Executes the full document analysis pipeline and returns the final context."""
    ctx = PipelineContext(source=source)
    logger.info("[%s] pipeline: starting for source=%s", ctx.run_id, source)

    ctx = run_ingestion(ctx, raw_text)
    ctx = run_classification(ctx)
    enricher = select_enricher(ctx)
    ctx = enricher(ctx)
    ctx = run_synthesis(ctx)

    total = sum(ctx.stage_timings.values())
    logger.info(
        "[%s] pipeline: complete in %.3fs | stages: %s",
        ctx.run_id, total, ctx.stage_timings,
    )
    return ctx

The run_pipeline function is deliberately short. It reads like a table of contents for the pipeline's logic. Anyone who picks up this codebase can understand the high-level flow in seconds, then drill into individual stage functions for detail. This is the readability payoff of keeping stages small and naming them clearly.

Adding Observability: Logging and Step Timing

A pipeline you cannot observe is a pipeline you cannot improve. The PipelineContext already stores timing data in stage_timings. Combining that with structured logging gives you two complementary views of your pipeline's behavior: a chronological event stream for debugging individual runs, and aggregated timing data for spotting performance bottlenecks across many runs.

💡 Real-World Example: A team running this pipeline against a corpus of 500 legal documents discovered that classification was taking 4–6 seconds per document — nearly 80% of total pipeline time. The timing data made the bottleneck undeniable. They added a caching layer keyed on document hash, which reduced repeat classification calls to milliseconds and cut overall throughput time by 70%.

📋 Quick Reference Card: Stage Timing Interpretation

⏱️ Observed Pattern 🔍 Likely Cause 🔧 Suggested Action
🔒 One stage dominates total time LLM call with large prompt Cache, truncate, or batch
📚 All stages roughly equal Well-balanced pipeline No action needed
🎯 High variance across runs Network latency or rate limits Add retries, use async
🧠 Synthesis slowest Too much data in context Summarize earlier in pipeline

Beyond timing, consider adding a simple pipeline summary log at the end of every run that emits the run ID, document type, confidence, total time, and whether any stages failed. This one-line summary is easy to aggregate in a log management tool and gives you an immediate health dashboard for your pipeline fleet.

⚠️ Common Mistake: Mistake 2 — Logging inside the stage functions at DEBUG level but forgetting to set the log level in your test environment. Your pipeline runs silently during development, you miss a subtle bug, and it surfaces in production. Set logging.basicConfig(level=logging.DEBUG) during development and use environment variables to control log verbosity in production.

Recognizing the Throughput Wall

The pipeline you have built processes documents sequentially: one document at a time, one stage at a time. For many workloads — interactive document review, low-volume compliance checking — this is perfectly adequate. But imagine you need to process 10,000 documents overnight. With an average pipeline time of 8 seconds per document, that is roughly 22 hours of sequential processing. A single overnight job becomes a three-night job.

Sequential processing of 5 documents:

Doc1: [Ingest]──[Classify]──[Enrich]──[Synthesize]
Doc2:                                               [Ingest]──[Classify]──[Enrich]──[Synthesize]
Doc3:                                                                                            [Ingest]──...

Timeline: ─────────────────────────────────────────────────────────────────────▶
          Each document waits for the previous one to finish

This is the throughput wall — the point at which sequential processing stops being an acceptable strategy. The wall appears because most of the pipeline's elapsed time is spent waiting: waiting for the LLM API to respond, waiting for network I/O, waiting for database queries. Your CPU is largely idle during these waits.

Parallel fan-out processing of 5 documents:

Doc1: [Ingest]──[Classify]──[Enrich]──[Synthesize]
Doc2: [Ingest]──[Classify]──[Enrich]──[Synthesize]
Doc3: [Ingest]──[Classify]──[Enrich]──[Synthesize]
Doc4: [Ingest]──[Classify]──[Enrich]──[Synthesize]
Doc5: [Ingest]──[Classify]──[Enrich]──[Synthesize]

Timeline: ─────────────────────────────────────────▶
          All documents processed concurrently

🤔 Did you know? When you make an HTTP request to an LLM API, your process typically blocks for 1–5 seconds waiting for a response. During that wait, a single-threaded program does nothing. Python's asyncio library lets you fire off dozens of these requests simultaneously and process results as they arrive, turning sequential dead time into concurrent throughput — without needing multiple CPU cores or threads.

The fan-out pattern — distributing work across multiple concurrent workers and collecting results — is the natural solution to the throughput wall. It is the subject of dedicated coverage in the child lessons that follow this one. The pipeline you have built here is the foundation: once you understand how a single document flows through all four stages, extending that to concurrent processing of many documents is a logical and learnable step.

🧠 Mnemonic: Think of sequential processing as a single checkout lane at a grocery store and fan-out as opening ten lanes simultaneously. The cashier (your LLM API) is no faster in either scenario — but customers (documents) stop waiting in line.

Structuring for Testability

Before moving on, it is worth pausing on testability — a quality that is easy to sacrifice when pipelines are built quickly. Because each stage is a pure function that accepts a PipelineContext and returns one, testing is straightforward. You can pre-populate the context with fixture data and assert on the fields the stage under test is responsible for, without standing up any external services.

For run_classification, you would use unittest.mock.patch to replace the OpenAI client call with a fixture response. For run_ingestion, no mocking is needed at all — it is pure string manipulation. For the router, you can test every routing decision by constructing a context with specific doc_type and classification_confidence values and asserting that select_enricher returns the expected function. This is a direct consequence of the design choice to make the router return a function rather than call it directly.

💡 Mental Model: Treat your pipeline like a series of pure mathematical functions. f(context) → context. A function that depends only on its inputs and produces only its outputs is trivially testable. Every side effect you hide inside a stage — writing to a database, calling an API without mocking — is a tax on your test suite's reliability.

Putting It All Together

You now have a complete, working reference implementation of a four-stage document analysis pipeline. The design choices made here — a structured context object, single-responsibility stages, explicit routing, uniform logging, and stage timing — are not arbitrary stylistic preferences. Each one addresses a specific failure mode that pipelines hit in production: state confusion, untestable spaghetti, silent routing errors, invisible bottlenecks, and undiagnosable failures.

This pipeline processes one document sequentially. As you move into the child lessons on prompt chaining and fan-out/fan-in patterns, you will take this same architecture and learn how to run many documents concurrently, how to break a single document's processing into parallel sub-tasks (for example, classifying while simultaneously checking a compliance database), and how to aggregate results from parallel workers back into a coherent output. The patterns are different in scale, not in kind. The mental model — nodes, edges, structured state — remains exactly the same.

Wrong thinking: "I'll optimize for parallelism from day one and add structure later."

Correct thinking: "I'll build the cleanest sequential pipeline first, then let real throughput constraints tell me where parallelism is worth the complexity cost."

The pipeline you have built is ready to evolve. The next lesson will show you exactly how.

Common Pitfalls and Anti-Patterns in Agent Pipelines

Building your first multi-step agent pipeline is exciting. The pieces click together, the stages hand off data cleanly, and the output looks reasonable. Then you deploy to production, and three days later you discover that under certain inputs the pipeline silently produces garbage — or worse, quietly halts while your application waits forever for a response that will never arrive.

This section is a field guide to the failure modes that experienced practitioners have learned the hard way. Understanding these anti-patterns before you hit them is the difference between a pipeline that breaks in your dev environment and one that breaks in front of your users. We will examine each pitfall with concrete examples, show you what the failure looks like in code, and give you actionable strategies to prevent or recover from each one.


Pitfall 1: State Mutation Bugs

State mutation bugs occur when one pipeline stage modifies shared context in a way that corrupts the data that downstream stages were expecting to read. Because multi-step pipelines pass a shared state object between nodes, any stage that writes carelessly to that object can silently overwrite data that another stage depends on.

Imagine a three-stage research pipeline where Stage 1 fetches raw documents, Stage 2 summarizes them, and Stage 3 formats a final report. Both Stage 2 and Stage 3 write to a key called content in the shared state. Stage 3 assumes content holds the summarized text from Stage 2, but Stage 2 was also expected to preserve the original documents under the same key for an audit log that a fourth monitoring stage reads. The monitoring stage now sees formatted output instead of raw documents — and no error is ever raised.

## ❌ Anti-pattern: stages mutate the same top-level key
def summarize_stage(state: dict) -> dict:
    # Overwrites whatever was in 'content' before
    state["content"] = summarize(state["content"])
    return state

def format_stage(state: dict) -> dict:
    # Also overwrites 'content', destroying the summary
    state["content"] = format_report(state["content"])
    return state

## ✅ Correct pattern: stages write to namespaced, stage-specific keys
def summarize_stage(state: dict) -> dict:
    # Reads from the raw input key, writes to its own output key
    state["summarize_output"] = summarize(state["raw_documents"])
    return state

def format_stage(state: dict) -> dict:
    # Reads from the summarizer's output, writes to its own key
    state["format_output"] = format_report(state["summarize_output"])
    return state

The fix is straightforward: treat state as an append-only ledger within a single pipeline run. Each stage reads from named upstream keys and writes to its own uniquely named output key. No stage should overwrite a key it did not itself create during this run.

🎯 Key Principle: Design your state schema before you write a single stage. Define which keys each stage reads and which keys it writes. If two stages need to write to the same logical concept, namespace them — summarize.output and format.output rather than a shared output.

⚠️ Common Mistake: Mistake 1 — Using a single mutable dictionary passed by reference across all stages without cloning. In Python, dict objects are passed by reference, so a stage that does state["key"] = value is mutating the same object every other stage holds a pointer to. Defensively deep-copy the state at each stage boundary if your framework does not do this for you.


Pitfall 2: Over-Chaining — Sequential Pipelines That Should Be Parallel

Over-chaining is the habit of wiring steps together in a long sequential chain when those steps are actually independent of each other and could run simultaneously. This is one of the most common performance anti-patterns in agentic systems, and it is easy to fall into because sequential thinking is natural.

Consider a customer support pipeline that needs to: (1) classify the sentiment of a message, (2) identify the product category, (3) check the customer's account tier, and (4) retrieve relevant knowledge base articles. A developer who thinks sequentially writes this as four steps in a chain. But steps 1, 2, and 3 are all independent — none of them depends on the output of any other. Only step 4 arguably needs step 2's category to run a targeted search.

❌ Over-chained (sequential) — total latency ≈ 4×LLM_call_time

[Classify Sentiment] → [Identify Category] → [Check Account Tier] → [Retrieve KB Articles]
     ~800ms                  ~800ms                ~200ms                  ~600ms
                                                                  Total ≈ 2,400ms

✅ Parallel fan-out — total latency ≈ max(individual times)

              ┌─[Classify Sentiment]──┐ ~800ms
              │                       │
[Start] ──────┼─[Identify Category]───┼──→ [Retrieve KB Articles] → [End]
              │                       │         ~600ms
              └─[Check Account Tier]──┘ ~200ms
                                              Total ≈ ~1,400ms

💡 Mental Model: Before wiring any two stages sequentially, ask one question: "Does Stage B need Stage A's output to begin?" If the answer is no, they should run in parallel. Only true data dependencies justify sequential ordering.

Over-chaining has a second, subtler form: unnecessary pass-through stages. This happens when a developer inserts a "validation" or "cleanup" stage that does nothing but reformat data slightly before handing it to the next step — work that could have been done in the producing stage or the consuming stage without a separate node.

🤔 Did you know? In production LLM pipelines, each network round-trip to the model API typically adds 300–1,500ms of latency depending on model size and load. A pipeline with six sequential LLM calls that could have been three parallel calls doubles the user-facing response time for no architectural reason.


Pitfall 3: Missing Error Boundaries

Error boundaries are explicit mechanisms that catch, classify, and handle failures at each stage transition so that one failing step cannot silently corrupt or permanently halt the entire pipeline. Pipelines without error boundaries behave like a chain of dominoes: one falling node takes everything downstream with it.

The failure mode takes two ugly forms. In the first form, an unhandled exception in Stage 3 of a five-stage pipeline raises all the way up to the top level, aborting the run. The user sees a generic 500 error. In the second, more insidious form, Stage 3 catches the exception internally but returns None or an empty string rather than raising — and Stages 4 and 5 dutifully process that empty value, producing a plausible-looking but completely wrong output with no warning raised.

import logging
from typing import Any, Optional

class StageResult:
    """Wraps stage output with explicit success/failure state."""
    def __init__(self, value: Any = None, error: Optional[Exception] = None):
        self.value = value
        self.error = error
        self.success = error is None

def run_stage_with_boundary(stage_fn, state: dict, stage_name: str) -> StageResult:
    """Execute a stage function and always return a StageResult."""
    try:
        output = stage_fn(state)
        return StageResult(value=output)
    except Exception as exc:
        logging.error(f"Stage '{stage_name}' failed: {exc}", exc_info=True)
        # Return a typed failure — never let None silently propagate
        return StageResult(error=exc)

def run_pipeline(stages: list, initial_state: dict) -> StageResult:
    state = initial_state.copy()
    for stage_name, stage_fn in stages:
        result = run_stage_with_boundary(stage_fn, state, stage_name)
        if not result.success:
            # Decide: abort, retry, or route to a fallback stage
            logging.warning(f"Pipeline halted at '{stage_name}'")
            return result  # Explicit failure, not silent corruption
        # Merge stage output into running state
        state.update(result.value)
    return StageResult(value=state)

This wrapper pattern does three important things. First, it ensures every stage transition is an explicit decision point: did this stage succeed or fail? Second, it attaches the stage name to every log entry, making post-mortem debugging dramatically easier. Third, it gives you a hook to implement retry logic, fallback stages, or graceful degradation at each boundary rather than only at the pipeline's outer edge.

🎯 Key Principle: Design your error strategy before you need it. For each stage, answer: if this stage fails, should the pipeline abort, retry with backoff, route to a fallback, or return a partial result? Encode that decision in the error boundary, not in ad-hoc try/except blocks scattered across your stage logic.

⚠️ Common Mistake: Mistake 2 — Catching all exceptions in a stage and logging them without re-raising or returning a typed failure signal. This is the "swallowing exceptions" anti-pattern. Downstream stages receive None or a stale value, and the pipeline appears to succeed while producing nonsense output.

Pipeline health signal with error boundaries vs. without:

Without boundaries:
  Stage 1 ✓ → Stage 2 ✓ → Stage 3 ✗ (silent) → Stage 4 ✓ (on bad data) → Bad Output 😶

With boundaries:
  Stage 1 ✓ → Stage 2 ✓ → Stage 3 ✗ → [Error Boundary] → Fallback/Abort + Log 🛑

Pitfall 4: Prompt Leakage Between Steps

Prompt leakage happens when context from one stage bleeds into a subsequent stage's prompt in ways that are irrelevant, contradictory, or actively harmful to that stage's task. Because agent pipelines typically accumulate conversation history or pass raw LLM outputs forward as text, it is easy to accidentally carry along reasoning traces, intermediate hypotheses, or role instructions that confuse the next model call.

Consider a two-stage pipeline where Stage 1 uses a creative brainstorming persona ("You are a creative, unconstrained ideation assistant...") and Stage 2 performs rigorous fact-checking. If the full output of Stage 1 — including its system prompt artifacts and reasoning traces — is naively concatenated into Stage 2's input, the fact-checker now sees contradictory persona instructions and may adopt the brainstorming tone rather than its skeptical, verification-oriented role.

A more subtle form of prompt leakage is context bloat: each stage appends its full output to a growing context window, so by Stage 6 the model is reading thousands of tokens of intermediate reasoning that are irrelevant to the current task. This increases latency, cost, and the probability of the model "losing the thread" in the middle of the accumulated noise.

## ❌ Anti-pattern: naively concatenating all prior stage outputs
def build_prompt_naive(state: dict, current_task: str) -> str:
    history = "\n".join([
        f"Stage {k}: {v}" for k, v in state.items()
    ])
    return f"{history}\n\nCurrent task: {current_task}"

## ✅ Better pattern: surgically extract only what the current stage needs
def build_prompt_selective(state: dict, current_task: str, needed_keys: list) -> str:
    """
    Each stage declares exactly which upstream outputs it needs.
    Unrelated context is excluded from the prompt.
    """
    relevant_context = {
        k: state[k] for k in needed_keys if k in state
    }
    context_block = "\n".join([
        f"{k}: {v}" for k, v in relevant_context.items()
    ])
    return f"Context:\n{context_block}\n\nTask: {current_task}"

Every stage should declare its context contract: which keys from the upstream state it reads and in what format. This declaration serves as both documentation and as the basis for the selective prompt builder. Anything not in the context contract is excluded from the prompt, preventing irrelevant material from polluting the model's attention.

💡 Pro Tip: If your pipeline uses a conversation history format (e.g., a list of {role, content} message objects), resist the temptation to pass the full conversation forward to every stage. Instead, extract only the semantic output of the upstream stage — the answer, the classification, the structured data — and inject that as a clean context block in the next stage's system or user prompt.

⚠️ Common Mistake: Mistake 3 — Including the upstream stage's system prompt or persona instructions in the downstream stage's input. This is especially harmful when stages have opposing tones (creative vs. analytical) or conflicting role definitions. Always strip system-prompt artifacts before handing text between stages.

🧠 Mnemonic: Think of each stage as a clean room. Before a stage gets to work, you wipe down the surfaces — you bring in only the specific materials it needs, not the entire contents of the previous room.


Pitfall 5: Treating the Pipeline as a Black Box

The most corrosive long-term anti-pattern is treating a multi-step agent pipeline as a single indivisible unit that you only evaluate at its final output. When you skip per-step output validation, you lose the ability to pinpoint where failures originate, you accumulate subtle errors across stages, and you have no early-warning system before bad output reaches users.

Schema validation at each stage boundary is the first layer of defense. If Stage 2 is supposed to produce a JSON object with category, confidence, and reasoning fields, assert that those fields exist and have the expected types before Stage 3 starts. If they do not, you have a precise failure location rather than a mystery.

from pydantic import BaseModel, ValidationError
from typing import Literal

## Define the expected output schema for Stage 2
class ClassificationOutput(BaseModel):
    category: Literal["billing", "technical", "general", "escalate"]
    confidence: float  # Expected: 0.0 to 1.0
    reasoning: str

def validate_stage_output(raw_output: dict, schema: type[BaseModel], stage_name: str) -> BaseModel:
    """
    Validate raw stage output against a Pydantic schema.
    Raises a descriptive error rather than letting bad data propagate.
    """
    try:
        validated = schema(**raw_output)
        # Extra business-logic checks beyond schema types
        if hasattr(validated, 'confidence') and not (0.0 <= validated.confidence <= 1.0):
            raise ValueError(f"Confidence score {validated.confidence} out of range [0,1]")
        return validated
    except ValidationError as exc:
        raise RuntimeError(
            f"Stage '{stage_name}' produced invalid output: {exc}"
        ) from exc

## Usage inside the pipeline orchestrator
raw = llm_call_stage_2(state)
validated_output = validate_stage_output(raw, ClassificationOutput, "classify_intent")
## Only now do we update state and proceed to Stage 3
state["classify_output"] = validated_output.model_dump()

Beyond schema validation, you should instrument each stage with observability hooks: log the stage name, input token count, output token count, latency, and a hash or preview of the output. This creates a per-run trace that lets you replay and inspect any individual stage in isolation.

Instrumented pipeline trace (example log output):

[PIPELINE RUN abc123]
  Stage 1 | fetch_documents    | status=ok  | latency=210ms  | out_tokens=1842
  Stage 2 | classify_intent    | status=ok  | latency=780ms  | out_tokens=87   | category=technical
  Stage 3 | retrieve_kb        | status=ok  | latency=320ms  | out_tokens=2104
  Stage 4 | generate_response  | status=FAIL| latency=1200ms | error=ValidationError: missing 'response'

With this trace, a developer can immediately see that Stage 4 failed, examine the input it received (which was the output of Stage 3), and reproduce the failure in isolation without re-running the entire pipeline.

❌ Wrong thinking: "I'll add logging and validation later once the pipeline is working." ✅ Correct thinking: Validation and observability are structural features of the pipeline, not afterthoughts. Build them in from the first stage.

💡 Real-World Example: A document processing pipeline at a legal technology company was producing subtly wrong contract summaries. Because there was no per-step validation, the team spent three weeks debugging before discovering that Step 4 (entity extraction) was occasionally returning a list instead of a dict, causing Step 5 to silently use a Python list index as a dictionary key — resulting in partial data that looked superficially correct. A single Pydantic schema check at the Step 4 boundary would have surfaced this immediately.


Putting It All Together: A Failure Mode Quick Reference

📋 Quick Reference Card: Agent Pipeline Anti-Patterns

Anti-Pattern Symptom Root Cause Prevention
🔄 State Mutation Bug Downstream stages see stale or wrong data Multiple stages write to the same state key Namespaced, append-only state keys per stage
🔗 Over-Chaining Slow pipelines; high latency under load Sequential wiring of independent stages Fan-out parallel execution for independent steps
🚧 Missing Error Boundaries Silent failures; corrupt output; hung pipelines No per-stage exception handling strategy Typed StageResult wrappers with explicit abort/retry/fallback logic
💬 Prompt Leakage Inconsistent tone; confused model behavior; cost overruns Full upstream output injected into downstream prompts Context contracts; selective prompt builders
📦 Black Box Pipelines Hard-to-debug failures; subtle data corruption No per-step validation or observability Pydantic schema checks + instrumented trace logging at every boundary

Developing a Defensive Pipeline Mindset

The thread running through all five pitfalls is the same: pipelines amplify assumptions. A single LLM call fails loudly and immediately. A five-stage pipeline can launder a bad assumption through four stages of plausible-looking intermediate results before producing an output that is subtly, expensively wrong.

The defensive pipeline mindset has three rules:

🔧 Make contracts explicit. Every stage should have a written (and ideally code-enforced) definition of its inputs, outputs, and error behavior. Implicit assumptions become explicit failures at the worst possible moment.

🎯 Fail fast and loudly. A pipeline that halts at Stage 3 with a clear error message is far better than one that completes all five stages while silently producing wrong output. Design your error boundaries to surface problems immediately rather than absorb them.

📚 Instrument everything from day one. You cannot debug what you cannot observe. Per-stage logging, schema validation, and latency tracking are not premature optimization — they are the foundation that makes every future debugging session hours shorter.

💡 Remember: The patterns in the next lessons — prompt chaining, fan-out/fan-in, ensemble routing — are powerful precisely because they compose many stages together. The pitfalls in this section scale with pipeline complexity. Catching them early, when your pipeline has three stages, is vastly easier than untangling them when it has twelve.

Key Takeaways and Preparing for the Next Level

You started this lesson with a single question: why isn't one prompt enough? By now, you have a concrete answer. Single-prompt AI calls are powerful but brittle — they collapse under complexity, fail to compose, and offer no visibility into intermediate reasoning. Multi-step agent pipelines solve this by breaking work into discrete, observable, and independently improvable units connected by explicit data flow. That shift from "one big ask" to "a graph of purposeful steps" is the foundational insight this lesson was built to deliver.

This final section does two things. First, it consolidates everything you have learned into a tightly organized reference you can return to whenever you start a new pipeline project. Second, it gives you a clear map of where the next two child lessons — Prompt Chaining and Fan-Out / Fan-In Workflows — will take you, so you walk into each one knowing exactly what gap it fills.


Recap of the Five Pipeline Primitives

Every agent pipeline, no matter how complex, is built from exactly five building blocks. Understanding each one precisely is what separates developers who debug pipelines quickly from those who spend hours chasing mysterious failures.

State is the shared memory of the pipeline. It is not a side effect or a cache — it is the primary medium through which nodes communicate. A node reads from state, does work, and writes back to state. If two nodes need to share information, that information must live in state. If state is ill-defined, the pipeline is ill-defined.

Nodes are the units of work. Each node has a single, well-named responsibility: classify an input, call a tool, summarize a document, validate a result. A node that does two things is a node that should be two nodes. The discipline of single-responsibility nodes is what makes pipelines testable in isolation.

Edges are the declared connections between nodes. A static edge says "after A, always go to B." Edges make the pipeline's possible paths explicit and auditable. When you draw your pipeline on a whiteboard, you are drawing edges.

Routers are conditional edges — functions that inspect the current state and return the name of the next node to execute. Routers are where business logic lives in a pipeline. A router that classifies intent sends the pipeline to a different node than one that checks a confidence threshold. Routers are powerful precisely because they are isolated: a routing bug is contained in one function, not scattered across the entire pipeline.

Execution modes determine whether nodes run one after another (sequential), only when a condition is true (conditional), or simultaneously (parallel). Most pipelines use all three modes in different parts of their graph. Choosing the wrong mode for a given set of nodes is one of the most common structural mistakes in pipeline design.

Pipeline Primitive Relationships

  ┌─────────────────────────────────────────────────────┐
  │                     STATE                           │
  │   (shared, typed, versioned across all nodes)       │
  └───────────────────────┬─────────────────────────────┘
                          │ read / write
          ┌───────────────┼───────────────┐
          ▼               ▼               ▼
       [Node A]        [Node B]        [Node C]
          │               │               │
     static edge      router edge    parallel edge
          │               │               │
          ▼           branches        fan-out
       [Node D]      [D] or [E]     [D] + [E] + [F]
                                         │
                                       fan-in
                                      [Node G]

💡 Mental Model: Think of state as a shared whiteboard, nodes as specialists who walk up to it, do one thing, and walk away, and edges + routers as the office manager who decides who goes next.


Decision Guide: Choosing the Right Execution Pattern

One of the most valuable skills you can develop is the ability to look at a problem and immediately know which execution pattern fits. The table below encodes the heuristics that experienced pipeline engineers apply intuitively.

🔍 Pattern 📋 Use When ⚠️ Avoid When ⏱️ Latency Profile
🔗 Sequential Chain Each step genuinely depends on the output of the previous step; order is semantically meaningful Steps are independent and could run simultaneously; you are bottlenecked on latency Additive — total time = sum of all steps
🔀 Conditional Branch The appropriate next action depends on a runtime classification or business rule You are using branches to simulate loops — use an explicit cycle instead Variable — only one branch executes
⚡ Parallel (Fan-Out) Multiple independent tasks can be executed simultaneously; latency matters Tasks have hidden dependencies; shared state requires locking Near-optimal — total time ≈ slowest task
🔁 Loop / Cycle A validator or critic determines that work must be retried or refined No termination condition is defined; you risk infinite loops Unbounded — must cap with max iterations
🗳️ Ensemble / Fan-In Multiple agents produce competing answers that need synthesis or voting One agent is clearly authoritative; aggregation adds noise Parallel execution + one aggregation step

🎯 Key Principle: Start with sequential, then introduce branching where classification is needed, then introduce parallelism where independence is proven. Adding complexity before it is needed is the single biggest source of pipeline bugs.


Quick-Reference Checklist for Designing a New Agent Pipeline

Use this checklist every time you start a new pipeline project. It encodes the lessons from this entire course into a sequence of concrete decisions.

## This checklist is best read as code: each "if" is a real design question.

def design_pipeline_checklist(problem):
    """
    Walk through this before writing a single line of pipeline code.
    Each step catches a class of bugs before they are written.
    """

    # STEP 1: Define the state schema first
    # What data enters the pipeline? What data must exit?
    # What intermediate data must nodes share?
    # Use a TypedDict or dataclass — never a plain dict.
    state_schema = define_state_schema(
        inputs=["user_query", "conversation_history"],
        outputs=["final_answer", "sources"],
        intermediate=["intent", "retrieved_docs", "draft_answer", "validation_result"]
    )

    # STEP 2: List all responsibilities, one per node
    # If you write "and" in a node description, split it.
    nodes = [
        "classify_intent",       # routes to the right pipeline branch
        "retrieve_documents",    # calls vector store
        "generate_draft",        # calls LLM
        "validate_answer",       # checks quality gates
        "format_response"        # structures final output
    ]

    # STEP 3: Identify which steps depend on prior outputs
    # Independent steps are candidates for parallelism.
    dependencies = {
        "retrieve_documents": ["classify_intent"],
        "generate_draft": ["retrieve_documents"],
        "validate_answer": ["generate_draft"],
        "format_response": ["validate_answer"]
        # No node has parallel siblings here — all sequential
    }

    # STEP 4: Identify routing points
    # Where does the pipeline need to make a decision?
    routing_points = {
        "after_classify_intent": "route to 'retrieval' branch or 'direct_answer' branch",
        "after_validate_answer": "loop back to generate_draft OR proceed to format_response"
    }

    # STEP 5: Define termination conditions for any loops
    loop_guards = {
        "validation_loop": "max_iterations=3, escalate_on_failure=True"
    }

    # STEP 6: Define error handling per node
    error_strategy = {
        "retrieve_documents": "retry(max=2), fallback_to_empty_context",
        "generate_draft": "retry(max=1), raise_on_second_failure"
    }

    return PipelineSpec(state_schema, nodes, dependencies, routing_points, loop_guards, error_strategy)

This checklist catches the six most common categories of pipeline problems before you write a single node:

🧠 State schema first — catches undefined data flow before you wire nodes together. 📚 One responsibility per node — catches god-nodes that become impossible to debug. 🔧 Explicit dependency mapping — reveals parallelism opportunities and prevents race conditions. 🎯 Named routing points — prevents routing logic from leaking into node bodies. 🔒 Loop guards — prevents infinite cycles from reaching production. 📋 Per-node error strategy — prevents silent failures from corrupting downstream state.

⚠️ Common Mistake — Mistake 1: Writing nodes first and state last. State is the contract between nodes. If you write nodes before defining the state schema, each node invents its own data format and you spend hours writing translation glue that should never exist.


Understanding What You Can Now Do That You Couldn't Before

📋 Quick Reference Card: Before and After This Lesson

🔍 Challenge ❌ Before This Lesson ✅ After This Lesson
🧩 Complex multi-step task One large prompt, hoping the model handles structure Explicit pipeline with nodes, state, and edges
🔀 Runtime decisions Prompt engineering tricks to get the model to "choose" Dedicated router node with classification logic
🐛 Debugging failures Re-running the whole prompt and guessing Isolating the failing node and testing it in isolation
⚡ Slow sequential processing Accept latency as unavoidable Identify independent steps and fan them out
🔁 Iterative refinement Retry the entire prompt from scratch Loop with a validator node and a capped max-iterations guard
📊 Observability No visibility into intermediate steps State as an audit trail, node outputs as structured logs

💡 Real-World Example: A production content moderation pipeline at a mid-sized SaaS company replaced a 2,400-token "do everything" prompt with a six-node pipeline: language detection → policy classification → severity routing → context retrieval → decision generation → audit logging. Accuracy improved by 23% because each node could be fine-tuned independently. More importantly, when a new policy was introduced, only the policy classification node needed to be updated — the rest of the pipeline was unchanged.


How Prompt Chaining Extends What You've Learned

The sequential pipeline patterns you encountered in this lesson — classify, retrieve, generate, validate, format — are the foundation of prompt chaining, but they only scratch the surface. The Prompt Chaining child lesson takes the sequential execution mode and transforms it into a precision instrument for multi-step reasoning.

The core insight of prompt chaining is that the output of one prompt is not just data passed to the next node — it is a carefully engineered input that constrains, focuses, and guides the next model call. This is different from simply passing state forward. A prompt chain is an intentional sequence where each link in the chain is designed to reduce the problem space for the next link.

Here is a preview of the patterns the child lesson covers:

Prompt Chaining Patterns (Preview)

1. DECOMPOSE → SOLVE → SYNTHESIZE
   ┌──────────┐    ┌───────────────┐    ┌────────────┐
   │ Decompose│───▶│ Solve each    │───▶│ Synthesize │
   │ complex  │    │ sub-problem   │    │ sub-answers│
   │ question │    │ sequentially  │    │ into one   │
   └──────────┘    └───────────────┘    └────────────┘

2. DRAFT → CRITIQUE → REVISE
   ┌───────┐    ┌─────────┐    ┌────────┐    ┌──────┐
   │ Draft │───▶│Critique │───▶│ Revise │───▶│Final │
   │       │    │(same or │    │        │    │      │
   │       │    │diff LLM)│    │        │    │      │
   └───────┘    └─────────┘    └────────┘    └──────┘

3. EXTRACT → TRANSFORM → VALIDATE
   Raw input ──▶ Structured data ──▶ Normalized form ──▶ Verified output

What you will learn in the Prompt Chaining lesson that this lesson only introduced:

🧠 Chain-of-thought scaffolding — how to structure intermediate prompts to elicit better reasoning before the final generation step. 📚 Output format contracts — how to use structured outputs (JSON schemas, Pydantic models) as the interface between chain links so parsing never fails. 🔧 Selective context injection — how to pass only the relevant slice of state to each prompt, keeping context windows lean and focused. 🎯 Self-consistency chains — how to run the same prompt multiple times and use majority voting to increase reliability on high-stakes decisions.

💡 Pro Tip: When you start the Prompt Chaining lesson, pay close attention to the section on output format contracts. The single most common bug in sequential chains is a downstream node receiving output in a format it didn't expect. Typed state schemas combined with structured LLM outputs eliminate this entire bug class.


How Fan-Out / Fan-In Fills the Parallelism Gap

This lesson introduced parallel execution as a concept — running independent nodes simultaneously to reduce latency — but it deliberately kept the treatment shallow. The reason is that parallel pipelines introduce a new category of problem that deserves its own dedicated treatment: coordination.

When you fan out to three parallel nodes, you immediately face questions this lesson did not fully answer: How do you wait for all three to complete before proceeding? What happens if one fails but the others succeed? How do you merge three independently produced results into one coherent state? How do you prevent parallel nodes from writing conflicting updates to shared state?

The Fan-Out / Fan-In Workflows child lesson is built entirely around these questions. Here is a preview of the territory it covers:

## Fan-Out / Fan-In Pattern Preview
## This is conceptual pseudocode — the child lesson covers
## the full implementation with real async coordination.

async def research_pipeline(state: ResearchState) -> ResearchState:
    """
    Fan out to three independent research agents.
    Fan in by merging their findings before synthesis.
    """
    # FAN-OUT: launch all three simultaneously
    academic_task   = research_agent(state, source="academic_papers")
    news_task       = research_agent(state, source="news_articles")
    internal_task   = research_agent(state, source="internal_docs")

    # COORDINATION: wait for all, handle partial failures
    results = await gather_with_fallback(
        tasks=[academic_task, news_task, internal_task],
        min_required=2,          # succeed if at least 2 complete
        timeout_seconds=15,      # don't wait forever
        failure_strategy="skip"  # skip failed branches, don't abort
    )

    # FAN-IN: merge results into unified state
    state["research_findings"] = merge_research_results(
        results,
        deduplication=True,
        conflict_resolution="most_recent"
    )

    return state

The patterns you will master in the Fan-Out / Fan-In lesson build directly on what you learned here about state management and node independence:

🧠 Scatter-gather — the foundational parallel pattern where work is distributed and then collected. 📚 Partial failure handling — how to define a "minimum viable fan-in" so the pipeline can proceed even when some parallel branches fail. 🔧 State merge strategies — how to resolve conflicts when two parallel nodes write to overlapping state keys. 🎯 Ensemble aggregation — how to run multiple LLM calls on the same input and synthesize their outputs for higher reliability.

🤔 Did you know? Fan-out pipelines can reduce effective latency by 60–80% on document analysis tasks where multiple sections of a large document can be processed simultaneously. The latency gain is roughly proportional to the number of parallel branches, capped by the latency of the slowest branch.


The Big Picture: Where You Are in the Journey

Your Learning Path Through Multi-Step Agent Pipelines

  ╔══════════════════════════════════════════════╗
  ║  THIS LESSON (Complete)                      ║
  ║  ✅ Pipeline primitives (state, nodes, edges)║
  ║  ✅ Routing and conditional branching         ║
  ║  ✅ End-to-end pipeline construction         ║
  ║  ✅ Common pitfalls and anti-patterns        ║
  ╚══════════════════════════════════════════════╝
                        │
           ┌────────────┴────────────┐
           ▼                         ▼
  ╔═════════════════╗       ╔═════════════════════╗
  ║ CHILD LESSON 1  ║       ║ CHILD LESSON 2      ║
  ║ Prompt Chaining ║       ║ Fan-Out / Fan-In    ║
  ║                 ║       ║ Workflows           ║
  ║ → Chain design  ║       ║ → Parallel exec     ║
  ║ → Output contracts      ║ → Coordination      ║
  ║ → Self-consistency      ║ → Merge strategies  ║
  ║ → Draft-critique║       ║ → Partial failures  ║
  ╚═════════════════╝       ╚═════════════════════╝
           │                         │
           └────────────┬────────────┘
                        ▼
  ╔══════════════════════════════════════════════╗
  ║  COMBINED MASTERY                            ║
  ║  Build production pipelines that are         ║
  ║  sequential where order matters,             ║
  ║  branching where decisions are needed,       ║
  ║  and parallel where independence allows.     ║
  ╚══════════════════════════════════════════════╝

🎯 Key Principle: The two child lessons are not alternatives — they are complements. Real production pipelines use prompt chaining for their sequential reasoning backbone and fan-out/fan-in for their parallel data gathering and ensemble decision-making. Mastering both is what enables you to build pipelines that are both accurate and fast.


Practical Next Steps

Leaving a lesson with knowledge is good. Leaving with a concrete action plan is better. Here are three things you can do immediately to reinforce and extend what you have learned:

1. Audit an existing AI feature using the pipeline primitives vocabulary. Take any AI feature you have built or are familiar with — even a simple chatbot — and describe it using the five primitives: state, nodes, edges, routers, execution modes. You will almost certainly find hidden routing logic buried in a node, undefined state being passed as function arguments, or sequential steps that could be parallelized. This audit exercise is the fastest way to internalize the mental model.

2. Build the checklist pipeline for a problem you care about. Take the design checklist from this section and apply it to a real problem — not a toy example. Define the state schema first. List the nodes with single-sentence responsibility descriptions. Map the dependencies. Identify the routing points. You do not need to implement it yet — the design artifact alone will be valuable when you enter the child lessons.

3. Read the Prompt Chaining child lesson with one question in mind. Before you start the next lesson, write down this question: "At which point in my checklist pipeline would a prompt chain replace a single node call, and what would that chain look like?" Having a concrete, personally relevant question to answer as you read will dramatically increase retention and transfer.

⚠️ Final critical point to remember: The sophistication of your pipeline architecture means nothing if your state schema is poorly defined. Every production pipeline failure that cannot be explained by an external API issue can ultimately be traced to a state management problem — either data that was never written, data that was overwritten by a concurrent node, or data that arrived in an unexpected format. Invest heavily in your state schema. It is the foundation everything else rests on.

🧠 Mnemonic: "SNEER"State first, Nodes single-purpose, Edges explicit, Errors handled per-node, Routers isolated. If your pipeline SNEERs at complexity, it will survive production.


Summary

You entered this lesson knowing that LLMs can answer questions. You leave it knowing how to build systems where LLMs do complex, multi-step work reliably. The distance between those two points is the distance between a demo and a production system.

The five primitives — state, nodes, edges, routers, and execution modes — are not just vocabulary. They are the conceptual toolkit that lets you decompose any agentic problem into a structure you can reason about, test, debug, and improve independently at each layer. The decision guide tells you when to use each pattern. The checklist tells you in what order to make each design decision. The pitfall catalog tells you where others have already learned painful lessons so you don't have to.

The two child lessons ahead are where this foundation gets built on. Prompt chaining will show you how sequential pipelines become precision reasoning instruments. Fan-out/fan-in will show you how parallel execution turns latency from a constraint into a design choice. Together, they complete the vocabulary you need to build production-grade agentic systems.

⚠️ The most important thing to carry forward: complexity in agentic systems is not a sign of sophistication. The best pipelines are the simplest ones that reliably solve the problem. Add nodes only when you have a responsibility that cannot belong to an existing node. Add parallelism only when you have proven independence. Add branching only when you have a genuine runtime decision to make. Simplicity is not a starting point you eventually graduate from — it is the permanent goal.