Files
2025-11-30 08:51:46 +08:00

13 KiB

name, description, category
name description category
rag-design-patterns Automatically applies when building RAG (Retrieval Augmented Generation) systems. Ensures proper chunking strategies, vector database patterns, embedding management, reranking, and retrieval optimization. ai-llm

RAG Design Patterns

When building Retrieval Augmented Generation systems, follow these patterns for effective, scalable retrieval.

Trigger Keywords: RAG, retrieval, vector database, embeddings, chunking, vector search, semantic search, reranking, vector store, FAISS, Pinecone, Qdrant, ChromaDB, retrieval augmented

Agent Integration: Used by ml-system-architect, rag-architect, llm-app-engineer, performance-and-cost-engineer-llm

Correct Pattern: Document Chunking

from typing import List
from pydantic import BaseModel, Field


class Chunk(BaseModel):
    """A document chunk with metadata."""
    id: str
    content: str
    start_char: int
    end_char: int
    document_id: str
    metadata: dict = Field(default_factory=dict)


class ChunkingStrategy:
    """Base class for chunking strategies."""

    def chunk(self, text: str, document_id: str) -> List[Chunk]:
        """Split text into chunks."""
        raise NotImplementedError


class SemanticChunker(ChunkingStrategy):
    """Chunk by semantic boundaries (paragraphs, sections)."""

    def __init__(
        self,
        max_chunk_size: int = 512,
        overlap: int = 50,
        split_on: List[str] = None
    ):
        self.max_chunk_size = max_chunk_size
        self.overlap = overlap
        self.split_on = split_on or ["\n\n", "\n", ". "]

    def chunk(self, text: str, document_id: str) -> List[Chunk]:
        """Chunk text at semantic boundaries."""
        chunks = []
        current_pos = 0

        # Split by semantic boundaries first
        for delimiter in self.split_on:
            if delimiter in text:
                segments = text.split(delimiter)
                break
        else:
            segments = [text]

        current_chunk = ""
        chunk_start = 0

        for segment in segments:
            if len(current_chunk) + len(segment) <= self.max_chunk_size:
                current_chunk += segment
            else:
                if current_chunk:
                    chunks.append(Chunk(
                        id=f"{document_id}-{len(chunks)}",
                        content=current_chunk,
                        start_char=chunk_start,
                        end_char=chunk_start + len(current_chunk),
                        document_id=document_id
                    ))
                current_chunk = segment
                chunk_start = chunk_start + len(current_chunk) - self.overlap

        # Add final chunk
        if current_chunk:
            chunks.append(Chunk(
                id=f"{document_id}-{len(chunks)}",
                content=current_chunk,
                start_char=chunk_start,
                end_char=chunk_start + len(current_chunk),
                document_id=document_id
            ))

        return chunks

Vector Database Integration

import numpy as np
from typing import List, Optional
import asyncio


class VectorStore:
    """Abstract vector database interface."""

    async def add_chunks(
        self,
        chunks: List[Chunk],
        embeddings: np.ndarray
    ):
        """Add chunks with their embeddings."""
        raise NotImplementedError

    async def search(
        self,
        query_embedding: np.ndarray,
        top_k: int = 5,
        filter: Optional[dict] = None
    ) -> List[tuple[Chunk, float]]:
        """Search for similar chunks."""
        raise NotImplementedError


class EmbeddingModel:
    """Generate embeddings for text."""

    async def embed(self, texts: List[str]) -> np.ndarray:
        """
        Generate embeddings for texts.

        Args:
            texts: List of text strings

        Returns:
            Array of embeddings, shape (len(texts), embedding_dim)
        """
        # Use OpenAI, Anthropic, or local model
        raise NotImplementedError


class RAGPipeline:
    """End-to-end RAG pipeline."""

    def __init__(
        self,
        chunker: ChunkingStrategy,
        embedder: EmbeddingModel,
        vector_store: VectorStore
    ):
        self.chunker = chunker
        self.embedder = embedder
        self.vector_store = vector_store

    async def index_document(
        self,
        text: str,
        document_id: str,
        metadata: Optional[dict] = None
    ):
        """Index a document into vector store."""
        # Chunk document
        chunks = self.chunker.chunk(text, document_id)

        # Add metadata
        if metadata:
            for chunk in chunks:
                chunk.metadata.update(metadata)

        # Generate embeddings
        texts = [c.content for c in chunks]
        embeddings = await self.embedder.embed(texts)

        # Store in vector DB
        await self.vector_store.add_chunks(chunks, embeddings)

    async def retrieve(
        self,
        query: str,
        top_k: int = 5,
        filter: Optional[dict] = None
    ) -> List[tuple[Chunk, float]]:
        """Retrieve relevant chunks for query."""
        # Embed query
        query_embedding = await self.embedder.embed([query])

        # Search vector store
        results = await self.vector_store.search(
            query_embedding[0],
            top_k=top_k,
            filter=filter
        )

        return results

Reranking for Improved Precision

from typing import List, Tuple


class Reranker:
    """Rerank retrieved chunks for better relevance."""

    async def rerank(
        self,
        query: str,
        chunks: List[Chunk],
        top_k: int = 5
    ) -> List[Tuple[Chunk, float]]:
        """
        Rerank chunks using cross-encoder or LLM.

        Args:
            query: Search query
            chunks: Retrieved chunks
            top_k: Number of top results to return

        Returns:
            Reranked chunks with new scores
        """
        # Use cross-encoder model or LLM for reranking
        scores = await self._compute_relevance_scores(query, chunks)

        # Sort by score
        ranked = sorted(
            zip(chunks, scores),
            key=lambda x: x[1],
            reverse=True
        )

        return ranked[:top_k]

    async def _compute_relevance_scores(
        self,
        query: str,
        chunks: List[Chunk]
    ) -> List[float]:
        """Compute relevance scores."""
        # Implementation using cross-encoder or LLM
        raise NotImplementedError


class RAGWithReranking(RAGPipeline):
    """RAG pipeline with reranking."""

    def __init__(self, *args, reranker: Reranker, **kwargs):
        super().__init__(*args, **kwargs)
        self.reranker = reranker

    async def retrieve(
        self,
        query: str,
        initial_k: int = 20,
        final_k: int = 5,
        filter: Optional[dict] = None
    ) -> List[tuple[Chunk, float]]:
        """Retrieve with reranking."""
        # Get initial results (over-retrieve)
        initial_results = await super().retrieve(
            query,
            top_k=initial_k,
            filter=filter
        )

        # Extract chunks
        chunks = [chunk for chunk, _ in initial_results]

        # Rerank
        reranked = await self.reranker.rerank(query, chunks, top_k=final_k)

        return reranked

Query Rewriting and Expansion

class QueryRewriter:
    """Rewrite queries for better retrieval."""

    async def rewrite(self, query: str) -> List[str]:
        """
        Generate multiple query variations.

        Args:
            query: Original query

        Returns:
            List of query variations
        """
        # Use LLM to generate variations
        prompt = f"""Generate 3 variations of this query for better search:

Original: {query}

Variations (one per line):"""

        response = await llm_complete(prompt)
        variations = [
            line.strip()
            for line in response.strip().split("\n")
            if line.strip()
        ]

        return [query] + variations  # Include original


class HybridRetriever:
    """Combine multiple retrieval strategies."""

    def __init__(
        self,
        vector_store: VectorStore,
        embedder: EmbeddingModel,
        query_rewriter: QueryRewriter
    ):
        self.vector_store = vector_store
        self.embedder = embedder
        self.query_rewriter = query_rewriter

    async def retrieve(
        self,
        query: str,
        top_k: int = 5
    ) -> List[tuple[Chunk, float]]:
        """Hybrid retrieval with query expansion."""
        # Generate query variations
        queries = await self.query_rewriter.rewrite(query)

        # Retrieve for each variation
        all_results = []
        for q in queries:
            results = await self._retrieve_for_query(q, top_k=top_k * 2)
            all_results.extend(results)

        # Deduplicate and merge scores
        merged = self._merge_results(all_results)

        # Return top k
        return sorted(merged, key=lambda x: x[1], reverse=True)[:top_k]

    def _merge_results(
        self,
        results: List[tuple[Chunk, float]]
    ) -> List[tuple[Chunk, float]]:
        """Merge and deduplicate results."""
        chunk_scores = {}
        for chunk, score in results:
            if chunk.id in chunk_scores:
                # Average or max score
                chunk_scores[chunk.id] = max(chunk_scores[chunk.id], score)
            else:
                chunk_scores[chunk.id] = score

        return [(chunk, chunk_scores[chunk.id]) for chunk, _ in results
                if chunk.id in chunk_scores]

Context Assembly for LLM

class ContextBuilder:
    """Build context from retrieved chunks."""

    def build_context(
        self,
        chunks: List[Chunk],
        max_tokens: int = 4000,
        include_metadata: bool = True
    ) -> str:
        """
        Assemble context from chunks within token limit.

        Args:
            chunks: Retrieved chunks
            max_tokens: Maximum context tokens
            include_metadata: Include chunk metadata

        Returns:
            Formatted context string
        """
        context_parts = []
        total_tokens = 0

        for i, chunk in enumerate(chunks):
            # Format chunk
            chunk_text = f"[Source {i+1}]\n{chunk.content}\n"

            if include_metadata and chunk.metadata:
                meta_str = ", ".join(
                    f"{k}: {v}" for k, v in chunk.metadata.items()
                )
                chunk_text += f"Metadata: {meta_str}\n"

            # Estimate tokens (rough: 1 token ≈ 4 chars)
            chunk_tokens = len(chunk_text) // 4

            if total_tokens + chunk_tokens > max_tokens:
                break

            context_parts.append(chunk_text)
            total_tokens += chunk_tokens

        return "\n".join(context_parts)


async def rag_complete(
    query: str,
    pipeline: RAGPipeline,
    context_builder: ContextBuilder
) -> str:
    """Complete RAG workflow."""
    # Retrieve
    chunks, scores = zip(*await pipeline.retrieve(query, top_k=5))

    # Build context
    context = context_builder.build_context(chunks)

    # Generate with LLM
    prompt = f"""Answer this question using the provided context.

Context:
{context}

Question: {query}

Answer:"""

    return await llm_complete(prompt)

Anti-Patterns

# ❌ Fixed-size chunking ignoring semantics
chunks = [text[i:i+512] for i in range(0, len(text), 512)]  # Breaks mid-sentence!

# ✅ Better: Semantic chunking
chunks = chunker.chunk(text, document_id)


# ❌ No overlap between chunks
chunks = split_by_size(text, 512)  # Context loss!

# ✅ Better: Add overlap
chunks = semantic_chunker.chunk(text, overlap=50)


# ❌ Embed and search without reranking
results = vector_store.search(query_embedding)  # Approximate!

# ✅ Better: Rerank top results
initial = vector_store.search(query_embedding, top_k=20)
results = reranker.rerank(query, initial, top_k=5)


# ❌ No metadata for filtering
chunk = Chunk(content=text)  # Can't filter!

# ✅ Better: Rich metadata
chunk = Chunk(
    content=text,
    metadata={
        "source": "docs",
        "date": "2025-01-15",
        "category": "technical"
    }
)

Best Practices Checklist

  • Use semantic chunking (paragraphs/sections, not arbitrary splits)
  • Add overlap between chunks (50-100 tokens)
  • Include rich metadata for filtering
  • Implement reranking for top results
  • Use query rewriting/expansion
  • Respect token limits when building context
  • Track retrieval metrics (precision, recall)
  • Embed chunks asynchronously in batches
  • Cache embeddings for frequently accessed documents
  • Test different chunk sizes for your use case

Auto-Apply

When building RAG systems:

  1. Implement semantic chunking with overlap
  2. Add comprehensive metadata to chunks
  3. Use async for all embedding/retrieval operations
  4. Implement reranking for top results
  5. Build context within token limits
  6. Log retrieval quality metrics
  • llm-app-architecture - For LLM integration
  • async-await-checker - For async patterns
  • pydantic-models - For data validation
  • observability-logging - For retrieval logging
  • performance-profiling - For optimization