You are viewing a preview of this lesson. Sign in to start learning
Back to 2026 Modern AI Search & RAG Roadmap

Embedding Pipeline

Build scalable embedding generation with batching, caching, and model optimization for throughput.

Embedding Pipeline

Master the embedding pipeline with free flashcards and spaced repetition practice. This lesson covers text chunking strategies, vector embedding generation, and efficient batch processingβ€”essential concepts for building modern retrieval-augmented generation (RAG) systems. Learn how to transform raw documents into searchable vector representations that power semantic search and AI assistants.

Welcome to the Embedding Pipeline

πŸ’» The embedding pipeline is the backbone of any modern RAG system. It's the process that transforms your raw text documents into mathematical representations (vectors) that machines can understand and compare. Without a well-designed embedding pipeline, even the most sophisticated AI models will struggle to retrieve relevant information.

Think of embeddings as translating human language into a coordinate system where similar concepts cluster together. Words like "dog" and "puppy" end up close to each other in this mathematical space, while "dog" and "airplane" sit far apart. The embedding pipeline orchestrates this transformation at scale, handling everything from document ingestion to vector storage.

πŸ” Why does this matter? In 2026, the quality of your search results depends less on keyword matching and more on semantic understanding. A user searching for "affordable transportation" should find results about "budget-friendly cars" even though the exact words don't match. This semantic magic happens in the embedding pipeline.

Core Concepts

1. Document Ingestion and Preprocessing

πŸ“„ The pipeline begins with document ingestionβ€”loading raw content from various sources (PDFs, web pages, databases, APIs). This stage handles format conversion and initial cleaning.

Key preprocessing steps:

StepPurposeExample
Format parsingExtract text from structured formatsPDF β†’ plain text, HTML β†’ content extraction
Encoding normalizationEnsure consistent character encodingUTF-8 standardization, handling special characters
Metadata extractionCapture document propertiesTitle, author, date, source URL
Language detectionIdentify document languageRoute to language-specific processors

πŸ’‘ Pro tip: Always preserve metadata during ingestion. Information like document date, author, and source can be crucial for filtering search results later.

2. Text Chunking Strategies

πŸ“ Chunking is the art of breaking long documents into smaller, meaningful pieces. This is critical because:

  • Embedding models have token limits (typically 512-8192 tokens)
  • Smaller chunks produce more precise retrieval results
  • Chunk size affects both relevance and context completeness

Common chunking strategies:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚        CHUNKING STRATEGY DECISION TREE      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

          Is structure important?
                   β”‚
        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
        β”‚                     β”‚
     β”Œβ”€β”€β”΄β”€β”€β”               β”Œβ”€β”€β”΄β”€β”€β”
     β”‚ YES β”‚               β”‚ NO  β”‚
     β””β”€β”€β”¬β”€β”€β”˜               β””β”€β”€β”¬β”€β”€β”˜
        β”‚                     β”‚
        β–Ό                     β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Semantic    β”‚      β”‚ Fixed-size   β”‚
  β”‚ (paragraphs,β”‚      β”‚ (tokens/     β”‚
  β”‚  sections)  β”‚      β”‚  chars)      β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚                     β”‚
        β”œβ”€β†’ Add overlap? ←─────
        β”‚                     β”‚
        β–Ό                     β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚  50-200 token overlap        β”‚
  β”‚  preserves context           β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1. Fixed-size chunking:

  • Split every N tokens (e.g., 512 tokens)
  • Simple and predictable
  • ⚠️ May break sentences mid-thought

2. Semantic chunking:

  • Split by natural boundaries (paragraphs, sentences, sections)
  • Respects document structure
  • Variable chunk sizes

3. Sliding window chunking:

  • Fixed size with overlap (e.g., 512 tokens, 50 token overlap)
  • Ensures no context loss at boundaries
  • Creates redundancy (trade-off: storage vs. completeness)

4. Recursive chunking:

  • Start with large chunks (e.g., sections)
  • Subdivide if too large
  • Maintains hierarchy

🧠 Memory device: Think "G.O.L.D.I.L.O.C.K.S" for chunk sizing:

  • Granular enough for precision
  • Overlap for context
  • Limited by token constraints
  • Document-structure aware
  • Iterable (test and adjust)
  • Large enough for meaning
  • Optimized for your use case
  • Consistent across corpus
  • Keep metadata attached
  • Smaller is often better

3. Text Enrichment and Augmentation

✨ Before embedding, enrich your chunks with additional context:

Context injection techniques:

TechniqueWhat it addsBenefit
Document title prepending"[Title]: chunk text"Adds document-level context to each chunk
Section headers"Section: Introduction\nchunk text"Preserves hierarchical structure
Synthetic questions"Q: What is X?\nA: chunk text"Improves retrieval for question queries
Summary prefix"Summary: ...\nDetail: chunk text"Dual-level representation

πŸ’‘ Advanced technique: Use an LLM to generate a one-sentence summary of each chunk and prepend it. This creates a "hook" for retrieval while preserving detailed content.

4. Embedding Model Selection

🎯 The embedding model is your pipeline's core engine. In 2026, popular choices include:

Model comparison:

Model FamilyDimensionsStrengthsUse Case
OpenAI text-embedding-3256-3072High quality, multilingualGeneral-purpose RAG
Cohere Embed v31024Excellent for search/clusteringE-commerce, documentation
sentence-transformers384-1024Open-source, customizablePrivacy-sensitive applications
BGE-large1024SOTA open-source performanceOn-premise deployments
Domain-specific modelsVariesOptimized for vertical (legal, medical)Specialized knowledge bases

Key selection criteria:

  1. Dimension size: Higher dimensions = more expressive, but slower and more storage
  2. Token limit: Must accommodate your chunk size
  3. Language support: Ensure coverage for your content
  4. Latency: API-based vs. self-hosted performance
  5. Cost: Per-token pricing for cloud services
  6. Domain alignment: General vs. specialized training

⚠️ Common mistake: Using a model trained primarily on English for multilingual content. Always verify language capabilities!

5. Batch Processing and Optimization

⚑ Processing millions of documents requires smart batching strategies:

Batch processing pipeline:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚        EFFICIENT BATCH PROCESSING FLOW         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

  πŸ“š Document Queue (1M docs)
           β”‚
           β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Chunk Generator  β”‚ β†’ Yields chunks on-demand
  β”‚ (streaming)      β”‚   (memory efficient)
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
           β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Batch Accumulatorβ”‚ β†’ Collect N chunks
  β”‚ (N=32-256)       β”‚   (optimize API calls)
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
           β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Embedding Model  β”‚ β†’ Process batch in parallel
  β”‚ (parallel)       β”‚   (GPU utilization)
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
           β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Vector Database  β”‚ β†’ Bulk insert
  β”‚ (bulk insert)    β”‚   (transaction batching)
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
           β–Ό
  βœ… Indexed Vectors

Optimization techniques:

1. Dynamic batching:

Batch size calculation:
- Start with small batch (N=32)
- Monitor throughput and memory
- Increase until diminishing returns
- Optimal batch β‰ˆ 64-256 for most APIs

2. Parallel processing:

  • Multiple worker threads/processes
  • Each worker handles independent batches
  • Coordinate through queue (e.g., Redis, RabbitMQ)

3. Caching:

  • Cache embeddings for duplicate chunks
  • Use content hash as cache key
  • Reduces redundant API calls

4. Rate limiting:

  • Respect API quotas (e.g., 3000 requests/minute)
  • Implement exponential backoff for retries
  • Track tokens/second consumption

πŸ’‘ Pro tip: For large corpora, process incrementally and save progress checkpoints. If processing fails, resume from the last checkpoint rather than starting over.

6. Vector Normalization and Post-Processing

πŸ“ After embedding generation, normalize vectors for consistent similarity computation:

L2 normalization formula:

Original vector: v = [v₁, vβ‚‚, v₃, ..., vβ‚™]

Magnitude: ||v|| = √(v₁² + vβ‚‚Β² + v₃² + ... + vβ‚™Β²)

Normalized: vΜ‚ = v / ||v||
            = [v₁/||v||, vβ‚‚/||v||, v₃/||v||, ..., vβ‚™/||v||]

Result: ||vΜ‚|| = 1 (unit vector)

Why normalize?

  • Enables cosine similarity via dot product (faster computation)
  • Ensures consistent similarity scales
  • Most vector databases expect normalized vectors

Post-processing steps:

StepPurposeImplementation
Dimensionality reductionReduce storage/compute costsPCA, UMAP (optional)
QuantizationCompress vectors (1024 dims Γ— 4 bytes β†’ 1024 dims Γ— 1 byte)Product quantization, scalar quantization
Metadata attachmentLink vectors to source chunksStore chunk_id, doc_id, position
Quality filteringRemove low-quality embeddingsFlag chunks with extreme/outlier vectors

7. Pipeline Orchestration and Error Handling

πŸ”§ A production embedding pipeline must be robust and observable:

Pipeline architecture pattern:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           PRODUCTION PIPELINE STAGES           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Ingest   │──→│ Process  │──→│ Embed    β”‚
β”‚          β”‚   β”‚          β”‚   β”‚          β”‚
β”‚ βœ“ Retry  β”‚   β”‚ βœ“ Validateβ”‚   β”‚ βœ“ Rate   β”‚
β”‚ βœ“ DLQ    β”‚   β”‚ βœ“ Enrich β”‚   β”‚   limit  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜
                                    β”‚
                                    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Monitor  │←──│ Store    │←──│ Validate β”‚
β”‚          β”‚   β”‚          β”‚   β”‚          β”‚
β”‚ βœ“ Metricsβ”‚   β”‚ βœ“ Bulk   β”‚   β”‚ βœ“ Dims   β”‚
β”‚ βœ“ Alerts β”‚   β”‚   insert β”‚   β”‚ βœ“ Format β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

DLQ = Dead Letter Queue (failed items)

Error handling strategies:

1. Retry logic:

Exponential backoff:
- Attempt 1: immediate
- Attempt 2: wait 1s
- Attempt 3: wait 2s
- Attempt 4: wait 4s
- After max retries β†’ Dead Letter Queue

2. Circuit breakers:

  • Monitor API error rates
  • If errors exceed threshold (e.g., 50% failures), pause processing
  • Resume after cooldown period
  • Prevents cascading failures

3. Data validation:

Validation checkpoints:
βœ“ Chunk not empty
βœ“ Token count within limits
βœ“ Embedding dims match expected
βœ“ Vector not all zeros
βœ“ Metadata complete

4. Observability:

  • Metrics: chunks/second, API latency, error rate, queue depth
  • Logging: Sample processed chunks, error details, throughput stats
  • Tracing: Track individual documents through pipeline stages

πŸ”” Alert conditions:

  • Processing rate drops below threshold
  • Error rate exceeds 5%
  • Queue backlog exceeds capacity
  • API quota near exhaustion

8. Incremental Updates and Versioning

πŸ”„ Real-world systems require continuous updates without full reprocessing:

Update strategies:

StrategyWhen to useImplementation
Append-onlyNew documents added frequentlyProcess new docs, add to index
UpsertDocuments updated/modifiedReplace existing vectors by doc_id
Soft deleteDocuments removedMark as deleted, filter from results
VersioningMajor corpus changesMaintain multiple index versions

Change detection approaches:

  1. Timestamp-based: Process docs modified since last run
  2. Content hash: Compare hash to detect changes
  3. Event-driven: Trigger on document create/update/delete events
  4. Scheduled: Full reprocess on regular cadence (weekly/monthly)

πŸ’‘ Hybrid approach: Use event-driven for hot data (frequently changing) and scheduled batches for cold data (historical content).

Embedding model versioning:

When upgrading to a new embedding model:

Migration Strategy:

1. Create new index (vectors_v2)
2. Dual-write new documents to both indices
3. Background reprocess old documents β†’ vectors_v2
4. Monitor progress: 0% ──────────────→ 100%
5. Switch reads to vectors_v2
6. Deprecate vectors_v1

Advantage: Zero downtime, easy rollback

Examples

Example 1: Simple Python Embedding Pipeline

Here's a minimal embedding pipeline using the OpenAI API and basic chunking:

import openai
from typing import List, Dict
import hashlib

class EmbeddingPipeline:
    def __init__(self, model="text-embedding-3-small"):
        self.model = model
        self.chunk_size = 512  # tokens
        self.overlap = 50      # tokens
    
    def chunk_text(self, text: str) -> List[str]:
        """Simple sentence-based chunking with overlap."""
        sentences = text.split('. ')
        chunks = []
        current_chunk = []
        current_length = 0
        
        for sentence in sentences:
            sentence_length = len(sentence.split())
            
            if current_length + sentence_length > self.chunk_size:
                # Save current chunk
                chunks.append('. '.join(current_chunk) + '.')
                # Start new chunk with overlap
                overlap_sentences = current_chunk[-3:] if len(current_chunk) > 3 else current_chunk
                current_chunk = overlap_sentences + [sentence]
                current_length = sum(len(s.split()) for s in current_chunk)
            else:
                current_chunk.append(sentence)
                current_length += sentence_length
        
        # Add final chunk
        if current_chunk:
            chunks.append('. '.join(current_chunk) + '.')
        
        return chunks
    
    def generate_embeddings(self, texts: List[str]) -> List[Dict]:
        """Batch embed texts with metadata."""
        results = []
        
        # Process in batches of 100 (API limit)
        for i in range(0, len(texts), 100):
            batch = texts[i:i+100]
            
            response = openai.embeddings.create(
                model=self.model,
                input=batch
            )
            
            for idx, embedding_obj in enumerate(response.data):
                chunk_text = batch[idx]
                results.append({
                    'text': chunk_text,
                    'vector': embedding_obj.embedding,
                    'chunk_id': hashlib.md5(chunk_text.encode()).hexdigest(),
                    'position': i + idx
                })
        
        return results
    
    def process_document(self, document: str, metadata: Dict) -> List[Dict]:
        """Full pipeline: chunk β†’ embed β†’ attach metadata."""
        # Step 1: Chunk
        chunks = self.chunk_text(document)
        
        # Step 2: Enrich with metadata
        enriched_chunks = [
            f"Document: {metadata.get('title', 'Untitled')}\n\n{chunk}"
            for chunk in chunks
        ]
        
        # Step 3: Embed
        embeddings = self.generate_embeddings(enriched_chunks)
        
        # Step 4: Attach original metadata
        for emb in embeddings:
            emb['metadata'] = metadata
        
        return embeddings

## Usage
pipeline = EmbeddingPipeline()
doc_text = """Your long document text here..."""
metadata = {'title': 'AI Search Guide', 'author': 'Jane Doe', 'date': '2026-01-15'}

results = pipeline.process_document(doc_text, metadata)
print(f"Generated {len(results)} embeddings")

Key features demonstrated:

  • Sentence-based chunking with overlap preservation
  • Batch processing to respect API limits
  • Content hashing for chunk deduplication
  • Metadata enrichment before embedding
  • Clean separation of concerns (chunk β†’ embed β†’ augment)

What's happening:

  1. Text split into ~512-token chunks at sentence boundaries
  2. Last 3 sentences overlap into next chunk (context preservation)
  3. Document title prepended to each chunk (context injection)
  4. Batches of 100 sent to embedding API (efficiency)
  5. Results include vectors + metadata for storage

Example 2: Advanced Semantic Chunking

This example uses semantic similarity to determine chunk boundaries:

import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from typing import List, Tuple

class SemanticChunker:
    def __init__(self, model_name='all-MiniLM-L6-v2'):
        self.model = SentenceTransformer(model_name)
        self.similarity_threshold = 0.7
    
    def split_into_sentences(self, text: str) -> List[str]:
        """Basic sentence splitting (use spaCy for production)."""
        import re
        sentences = re.split(r'(?<=[.!?])\s+', text)
        return [s.strip() for s in sentences if s.strip()]
    
    def compute_sentence_embeddings(self, sentences: List[str]) -> np.ndarray:
        """Embed all sentences for comparison."""
        return self.model.encode(sentences)
    
    def find_chunk_boundaries(self, similarities: np.ndarray) -> List[int]:
        """Identify where similarity drops (topic shifts)."""
        boundaries = [0]  # Start of document
        
        for i in range(1, len(similarities)):
            # Compare consecutive sentence similarity
            if similarities[i-1][i] < self.similarity_threshold:
                boundaries.append(i)  # Topic shift detected
        
        boundaries.append(len(similarities))  # End of document
        return boundaries
    
    def create_semantic_chunks(self, text: str) -> List[str]:
        """Chunk based on semantic coherence."""
        sentences = self.split_into_sentences(text)
        
        if len(sentences) <= 1:
            return sentences
        
        # Embed sentences
        embeddings = self.compute_sentence_embeddings(sentences)
        
        # Compute pairwise similarities
        similarities = cosine_similarity(embeddings)
        
        # Find boundaries where similarity drops
        boundaries = self.find_chunk_boundaries(similarities)
        
        # Create chunks from boundaries
        chunks = []
        for i in range(len(boundaries) - 1):
            start_idx = boundaries[i]
            end_idx = boundaries[i + 1]
            chunk = ' '.join(sentences[start_idx:end_idx])
            chunks.append(chunk)
        
        return chunks

## Usage
chunker = SemanticChunker()
text = """
Artificial intelligence is transforming industries. Machine learning enables 
predictive analytics. Deep learning uses neural networks.

The weather today is sunny. Temperature will reach 75 degrees. Light winds 
from the west are expected.

Quantum computing represents a paradigm shift. Qubits enable superposition. 
Entanglement provides computational advantages.
"""

chunks = chunker.create_semantic_chunks(text)
for i, chunk in enumerate(chunks):
    print(f"\nChunk {i+1}:\n{chunk}")

Output explanation:

  • Chunk 1: AI/ML sentences (high semantic similarity)
  • Chunk 2: Weather sentences (topic shift detected)
  • Chunk 3: Quantum computing sentences (another topic shift)

Why this works: When consecutive sentences have low cosine similarity (<0.7), it indicates a topic change. This creates more meaningful chunks than arbitrary token counts.

Example 3: Production Pipeline with Error Handling

This example shows enterprise-grade orchestration:

import logging
import time
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
import redis
import json

class ProcessingStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRY = "retry"

@dataclass
class ProcessingMetrics:
    total_chunks: int = 0
    successful: int = 0
    failed: int = 0
    retries: int = 0
    start_time: float = 0
    
    def throughput(self) -> float:
        elapsed = time.time() - self.start_time
        return self.successful / elapsed if elapsed > 0 else 0

class ResilientEmbeddingPipeline:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        self.max_retries = 3
        self.batch_size = 64
        self.metrics = ProcessingMetrics()
        
    def process_with_retry(self, chunk: str, attempt: int = 1) -> Optional[Dict]:
        """Process single chunk with exponential backoff."""
        try:
            # Simulate embedding API call
            vector = self.generate_embedding(chunk)
            
            # Validate result
            if not self.validate_embedding(vector):
                raise ValueError("Invalid embedding dimensions")
            
            return {'text': chunk, 'vector': vector, 'status': 'success'}
            
        except Exception as e:
            self.logger.error(f"Attempt {attempt} failed: {str(e)}")
            
            if attempt < self.max_retries:
                # Exponential backoff
                wait_time = 2 ** (attempt - 1)
                time.sleep(wait_time)
                self.metrics.retries += 1
                return self.process_with_retry(chunk, attempt + 1)
            else:
                # Max retries exceeded β†’ Dead Letter Queue
                self.send_to_dlq(chunk, str(e))
                self.metrics.failed += 1
                return None
    
    def validate_embedding(self, vector: List[float]) -> bool:
        """Validate embedding quality."""
        if len(vector) != 1536:  # Expected dimensions
            return False
        if sum(vector) == 0:  # All zeros
            return False
        if any(abs(v) > 10 for v in vector):  # Outliers
            return False
        return True
    
    def send_to_dlq(self, chunk: str, error: str):
        """Send failed items to Dead Letter Queue."""
        dlq_item = {
            'chunk': chunk,
            'error': error,
            'timestamp': time.time(),
            'attempts': self.max_retries
        }
        self.redis.lpush('embedding_dlq', json.dumps(dlq_item))
        self.logger.warning(f"Sent to DLQ: {chunk[:50]}...")
    
    def process_batch(self, chunks: List[str]) -> List[Dict]:
        """Process batch with circuit breaker."""
        results = []
        failures = 0
        failure_threshold = len(chunks) * 0.5  # 50% failure = circuit open
        
        for chunk in chunks:
            result = self.process_with_retry(chunk)
            
            if result:
                results.append(result)
                self.metrics.successful += 1
            else:
                failures += 1
            
            # Circuit breaker
            if failures > failure_threshold:
                self.logger.critical("Circuit breaker triggered! Halting batch.")
                # Send remaining to queue for later
                remaining = chunks[len(results):]
                for c in remaining:
                    self.redis.lpush('embedding_queue', c)
                break
        
        return results
    
    def pipeline_orchestrator(self, document_chunks: List[str]):
        """Main orchestration with monitoring."""
        self.metrics.start_time = time.time()
        self.metrics.total_chunks = len(document_chunks)
        
        # Process in batches
        for i in range(0, len(document_chunks), self.batch_size):
            batch = document_chunks[i:i + self.batch_size]
            
            self.logger.info(f"Processing batch {i//self.batch_size + 1}")
            results = self.process_batch(batch)
            
            # Store results (vector database insertion)
            self.bulk_insert(results)
            
            # Log metrics
            throughput = self.metrics.throughput()
            self.logger.info(f"Throughput: {throughput:.2f} chunks/sec")
            self.logger.info(f"Success rate: {self.metrics.successful}/{self.metrics.total_chunks}")
            
            # Alert if performance degrades
            if throughput < 10:  # Threshold: 10 chunks/sec
                self.logger.warning("Performance degradation detected!")
    
    def generate_embedding(self, text: str) -> List[float]:
        """Placeholder for actual embedding generation."""
        # In production: call OpenAI, Cohere, etc.
        return [0.1] * 1536
    
    def bulk_insert(self, results: List[Dict]):
        """Placeholder for vector database insertion."""
        # In production: Pinecone, Weaviate, Qdrant, etc.
        pass

Production features demonstrated:

  • βœ… Exponential backoff retry logic
  • βœ… Dead Letter Queue for permanent failures
  • βœ… Circuit breaker to prevent cascading failures
  • βœ… Real-time metrics (throughput, success rate)
  • βœ… Batch processing optimization
  • βœ… Logging and alerting integration
  • βœ… Embedding validation

Example 4: Streaming Pipeline for Large Corpora

For datasets too large for memory, use streaming processing:

import itertools
from typing import Iterator, Generator
import multiprocessing as mp
from functools import partial

class StreamingPipeline:
    def __init__(self, num_workers=4):
        self.num_workers = num_workers
    
    def document_generator(self, file_path: str) -> Generator[str, None, None]:
        """Stream documents from large file without loading all into memory."""
        with open(file_path, 'r') as f:
            current_doc = []
            for line in f:
                line = line.strip()
                if line == "---DOC_SEPARATOR---":
                    if current_doc:
                        yield '\n'.join(current_doc)
                        current_doc = []
                else:
                    current_doc.append(line)
            
            # Yield final document
            if current_doc:
                yield '\n'.join(current_doc)
    
    def chunk_generator(self, documents: Iterator[str]) -> Generator[str, None, None]:
        """Stream chunks from document stream."""
        for doc in documents:
            chunks = self.chunk_document(doc)
            for chunk in chunks:
                yield chunk
    
    def chunk_document(self, doc: str, chunk_size=512) -> List[str]:
        """Simple chunking logic."""
        words = doc.split()
        chunks = []
        for i in range(0, len(words), chunk_size):
            chunk = ' '.join(words[i:i+chunk_size])
            chunks.append(chunk)
        return chunks
    
    def parallel_embed_worker(self, chunk: str) -> Dict:
        """Worker function for parallel processing."""
        # Simulate embedding generation
        vector = [0.1] * 1536
        return {'text': chunk, 'vector': vector}
    
    def stream_process(self, file_path: str, output_path: str):
        """Main streaming pipeline with parallel workers."""
        # Create document stream
        docs = self.document_generator(file_path)
        
        # Create chunk stream
        chunks = self.chunk_generator(docs)
        
        # Process in parallel with worker pool
        with mp.Pool(self.num_workers) as pool:
            # Process chunks in batches of 100
            batch_size = 100
            batch_count = 0
            
            while True:
                # Get next batch
                batch = list(itertools.islice(chunks, batch_size))
                if not batch:
                    break
                
                # Process batch in parallel
                results = pool.map(self.parallel_embed_worker, batch)
                
                # Stream results to output (e.g., vector DB)
                self.write_batch(results, output_path)
                
                batch_count += 1
                print(f"Processed batch {batch_count} ({len(results)} chunks)")
    
    def write_batch(self, results: List[Dict], output_path: str):
        """Write results to output (simplified for example)."""
        with open(output_path, 'a') as f:
            for result in results:
                f.write(json.dumps(result) + '\n')

## Usage for processing 10GB+ corpora
pipeline = StreamingPipeline(num_workers=8)
pipeline.stream_process(
    file_path='massive_corpus.txt',
    output_path='embeddings.jsonl'
)

Key streaming advantages:

  • πŸ’Ύ Constant memory usage regardless of corpus size
  • ⚑ Parallel processing across multiple CPU cores
  • πŸ“Š Progress tracking with batch counting
  • πŸ”„ Incremental output (results written immediately)

Memory comparison:

Traditional approach:
- Load 10GB file β†’ 10GB RAM
- Chunk all docs β†’ 12GB RAM
- Generate embeddings β†’ 15GB RAM
Total: 15GB+ peak memory

Streaming approach:
- Process one batch β†’ 100MB RAM
- Workers active β†’ 500MB RAM
- Output buffer β†’ 50MB RAM
Total: 650MB peak memory

Common Mistakes

⚠️ 1. Chunks too large or too small

Problem: Using fixed 2000-token chunks that exceed model limits, or 50-token chunks that lack context.

Solution: Test different chunk sizes (256, 512, 1024 tokens) and measure retrieval quality. Sweet spot is usually 512-1024 tokens for general content.

Example of bad chunking:

Chunk: "The company" (2 words) ❌
Vs.
Chunk: "The company announced a new AI initiative focusing on sustainable 
technology solutions for enterprise customers in the healthcare sector..." 
(512 tokens) βœ…

⚠️ 2. Ignoring chunk overlap

Problem: Hard boundaries cause context loss. A sentence like "This solution addresses the problem mentioned earlier" loses meaning if "the problem" is in the previous chunk.

Solution: Always use 10-20% overlap between chunks (50-200 tokens).

⚠️ 3. Not normalizing vectors

Problem: Comparing raw embedding vectors leads to inconsistent similarity scores and slower computation.

Solution: Always L2-normalize vectors after generation:

import numpy as np

def normalize_vector(v):
    norm = np.linalg.norm(v)
    return v / norm if norm > 0 else v

⚠️ 4. Embedding without context enrichment

Problem: Embedding chunks without document-level context.

Bad: "It uses advanced algorithms" (What uses? What document?)

Good: "Title: AI Search Guide | Section: Architecture | It uses advanced algorithms"

⚠️ 5. Ignoring language differences

Problem: Using an English-trained model for multilingual content.

Solution: Use multilingual models (e.g., multilingual-e5-large) or language-specific models for best results.

⚠️ 6. No deduplication

Problem: Embedding duplicate chunks wastes tokens and storage.

Solution: Use content hashing to detect and skip duplicates:

import hashlib

seen_hashes = set()
for chunk in chunks:
    chunk_hash = hashlib.md5(chunk.encode()).hexdigest()
    if chunk_hash not in seen_hashes:
        process(chunk)
        seen_hashes.add(chunk_hash)

⚠️ 7. Batch size too small

Problem: Processing one chunk at a time β†’ 1000x API calls instead of 10x.

Solution: Batch 50-100 chunks per API call (check your provider's limits).

⚠️ 8. No checkpointing

Problem: Pipeline crashes after 6 hours of processing β†’ start over from scratch.

Solution: Save progress every N documents:

if processed_count % 1000 == 0:
    save_checkpoint(processed_count, last_doc_id)

⚠️ 9. Metadata loss

Problem: Embedding chunks but forgetting to link them back to source documents.

Solution: Always store: doc_id, chunk_id, position, source_url, timestamp.

⚠️ 10. Ignoring model versioning

Problem: Upgrading embedding models without reprocessing β†’ mixing incompatible vector spaces.

Solution: When changing models, either:

  • Reprocess entire corpus (small datasets)
  • Maintain separate indices per model version (large datasets)
  • Use gradual migration strategy (dual-write approach)

Key Takeaways

🎯 Essential principles for embedding pipelines:

  1. Chunking is an art and a science β†’ Test different strategies (fixed, semantic, recursive) and measure retrieval quality

  2. Context is everything β†’ Enrich chunks with document titles, section headers, and surrounding context before embedding

  3. Batch aggressively β†’ Process 50-256 chunks per API call to minimize latency and cost

  4. Normalize all vectors β†’ L2 normalization enables faster similarity computation and consistent scoring

  5. Build for failure β†’ Implement retries, circuit breakers, dead letter queues, and checkpointing

  6. Stream when possible β†’ Process large corpora without loading everything into memory

  7. Monitor everything β†’ Track throughput, error rates, queue depths, and API quotas

  8. Version your models β†’ Maintain index versions when upgrading embedding models

  9. Optimize incrementally β†’ Start simple, measure performance, then optimize bottlenecks

  10. Test your retrievals β†’ The best pipeline produces embeddings that retrieve relevant resultsβ€”measure precision and recall

πŸ“‹ Quick Reference Card: Embedding Pipeline Checklist

StageKey Actions
πŸ“₯ IngestionParse formats, normalize encoding, extract metadata
βœ‚οΈ ChunkingChoose strategy (fixed/semantic), add overlap (50-200 tokens)
✨ EnrichmentPrepend titles, add section headers, inject context
🎯 EmbeddingSelect model, batch requests (50-256), respect rate limits
πŸ“ Post-processingL2 normalize, validate dimensions, attach metadata
πŸ’Ύ StorageBulk insert, index vectors, maintain mappings
πŸ” ValidationTest retrieval quality, check edge cases
πŸ“Š MonitoringTrack throughput, error rates, queue depth

Optimal chunk sizes by content type:

Content TypeRecommended Size
Technical docs512-768 tokens
News articles256-512 tokens
Legal documents768-1024 tokens
Chat transcripts256-384 tokens
Product descriptions128-256 tokens

πŸ“š Further Study

Essential resources for deepening your understanding:

  1. Vector Database Documentation:

  2. Sentence Transformers Library:

  3. LangChain Text Splitters:


Next steps: Now that you understand embedding pipelines, practice implementing different chunking strategies on your own documents. Experiment with overlap sizes and measure how they affect retrieval quality. The best way to master this is through hands-on experimentation with real data!