Skip to content

RAG (Retrieval Augmented Generation) Overview

What is RAG?

RAG combines the power of large language models with external knowledge retrieval to provide accurate, up-to-date, and contextual responses. Instead of relying solely on the model's training data, RAG systems fetch relevant information from a knowledge base at query time.

Architecture Components

graph LR
    subgraph "Ingestion Pipeline"
        DOCS[Documents] --> LOAD[Document Loader]
        LOAD --> SPLIT[Text Splitter]
        SPLIT --> EMBED1[Embedding Model]
        EMBED1 --> VECTOR[(Vector DB)]
    end

    subgraph "Query Pipeline"
        QUERY[User Query] --> EMBED2[Embedding Model]
        EMBED2 --> SEARCH[Similarity Search]
        VECTOR --> SEARCH
        SEARCH --> CONTEXT[Retrieved Context]
        CONTEXT --> PROMPT[Prompt Construction]
        QUERY --> PROMPT
        PROMPT --> LLM[LLM]
        LLM --> RESPONSE[Response]
    end

Implementation Example

Document Ingestion

from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Qdrant
import qdrant_client

class RAGIngestionPipeline:
    def __init__(self, collection_name="easychamp_docs"):
        self.embeddings = OpenAIEmbeddings()
        self.client = qdrant_client.QdrantClient(host="localhost", port=6333)
        self.collection_name = collection_name

    def ingest_documents(self, directory_path: str):
        # Load documents
        loader = DirectoryLoader(
            directory_path,
            glob="**/*.md",  # Load markdown files
            show_progress=True
        )
        documents = loader.load()

        # Split into chunks
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len,
            separators=["\n\n", "\n", " ", ""]
        )
        chunks = text_splitter.split_documents(documents)

        # Add metadata
        for i, chunk in enumerate(chunks):
            chunk.metadata.update({
                "chunk_id": i,
                "source_file": chunk.metadata.get("source", "unknown"),
                "chunk_size": len(chunk.page_content)
            })

        # Create vector store
        vector_store = Qdrant.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            url="http://localhost:6333",
            collection_name=self.collection_name,
            force_recreate=True
        )

        return vector_store

Query Pipeline

from langchain.chains import RetrievalQA
from langchain.llms import Anthropic
from langchain.prompts import PromptTemplate

class RAGQueryPipeline:
    def __init__(self, vector_store):
        self.vector_store = vector_store
        self.llm = Anthropic(model="claude-3-opus-20240229")
        self.setup_chain()

    def setup_chain(self):
        # Custom prompt template
        prompt_template = """You are an AI assistant for EasyChamp documentation.
        Use the following context to answer the question. If you don't know the answer,
        say so - don't make up information.

        Context:
        {context}

        Question: {question}

        Answer: """

        PROMPT = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )

        # Create retrieval chain
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.vector_store.as_retriever(
                search_type="similarity",
                search_kwargs={"k": 5}  # Retrieve top 5 chunks
            ),
            chain_type_kwargs={"prompt": PROMPT},
            return_source_documents=True
        )

    def query(self, question: str):
        result = self.qa_chain({"query": question})

        return {
            "answer": result["result"],
            "sources": [doc.metadata["source"] for doc in result["source_documents"]],
            "confidence": self.calculate_confidence(result)
        }

    def calculate_confidence(self, result):
        # Simple confidence based on source relevance
        if len(result["source_documents"]) == 0:
            return 0.0

        # Average similarity scores
        scores = [doc.metadata.get("score", 0.5) for doc in result["source_documents"]]
        return sum(scores) / len(scores)

Advanced Techniques

Combine vector similarity with keyword search.

class HybridSearch:
    def __init__(self, vector_store, bm25_index):
        self.vector_store = vector_store
        self.bm25_index = bm25_index

    def search(self, query: str, k: int = 5, alpha: float = 0.5):
        # Vector search
        vector_results = self.vector_store.similarity_search_with_score(
            query, k=k*2
        )

        # Keyword search (BM25)
        keyword_results = self.bm25_index.search(query, k=k*2)

        # Combine and rerank
        combined = self.reciprocal_rank_fusion(
            vector_results, 
            keyword_results,
            alpha=alpha
        )

        return combined[:k]

    def reciprocal_rank_fusion(self, vector_results, keyword_results, alpha=0.5):
        # RRF algorithm for combining rankings
        scores = {}

        for rank, (doc, score) in enumerate(vector_results):
            doc_id = doc.metadata["chunk_id"]
            scores[doc_id] = scores.get(doc_id, 0) + alpha / (rank + 1)

        for rank, doc in enumerate(keyword_results):
            doc_id = doc["chunk_id"]
            scores[doc_id] = scores.get(doc_id, 0) + (1-alpha) / (rank + 1)

        # Sort by combined score
        sorted_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        return sorted_docs

Contextual Compression

Reduce retrieved context to relevant parts.

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

class CompressedRAG:
    def __init__(self, vector_store, llm):
        self.base_retriever = vector_store.as_retriever()

        # Setup compressor
        compressor = LLMChainExtractor.from_llm(llm)

        self.retriever = ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=self.base_retriever
        )

    def retrieve_compressed(self, query: str):
        # Returns only relevant portions of documents
        compressed_docs = self.retriever.get_relevant_documents(query)
        return compressed_docs

Multi-Query Retrieval

Generate multiple queries for better coverage.

class MultiQueryRAG:
    def __init__(self, vector_store, llm):
        self.vector_store = vector_store
        self.llm = llm

    def generate_queries(self, original_query: str) -> list:
        prompt = f"""Generate 3 different versions of this question to retrieve relevant documents:

        Original question: {original_query}

        Alternative questions:
        1."""

        response = self.llm.invoke(prompt)

        # Parse generated queries
        queries = [original_query]
        queries.extend(self.parse_queries(response))

        return queries

    def retrieve_multi_query(self, query: str, k: int = 5):
        # Generate alternative queries
        queries = self.generate_queries(query)

        # Retrieve for each query
        all_docs = []
        seen_ids = set()

        for q in queries:
            docs = self.vector_store.similarity_search(q, k=k)

            for doc in docs:
                doc_id = doc.metadata.get("chunk_id")
                if doc_id not in seen_ids:
                    all_docs.append(doc)
                    seen_ids.add(doc_id)

        # Rerank based on relevance to original query
        return self.rerank_documents(all_docs, query)[:k]

Chunking Strategies

Semantic Chunking

class SemanticChunker:
    def __init__(self, embeddings, threshold=0.8):
        self.embeddings = embeddings
        self.threshold = threshold

    def chunk_document(self, text: str) -> list:
        sentences = text.split('. ')
        chunks = []
        current_chunk = []

        for i, sentence in enumerate(sentences):
            if not current_chunk:
                current_chunk.append(sentence)
                continue

            # Check semantic similarity
            chunk_embedding = self.embeddings.embed_query(' '.join(current_chunk))
            sentence_embedding = self.embeddings.embed_query(sentence)

            similarity = self.cosine_similarity(chunk_embedding, sentence_embedding)

            if similarity > self.threshold:
                current_chunk.append(sentence)
            else:
                # Start new chunk
                chunks.append(' '.join(current_chunk))
                current_chunk = [sentence]

        if current_chunk:
            chunks.append(' '.join(current_chunk))

        return chunks

Hierarchical Chunking

class HierarchicalChunker:
    def __init__(self):
        self.chunk_sizes = [512, 1024, 2048]  # Multiple granularities

    def create_chunks(self, document: str) -> dict:
        chunks = {}

        for size in self.chunk_sizes:
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=size,
                chunk_overlap=size // 10
            )
            chunks[f"size_{size}"] = splitter.split_text(document)

        return chunks

    def retrieve_hierarchical(self, query: str, vector_stores: dict):
        # Start with smallest chunks for precision
        results = vector_stores["size_512"].similarity_search(query, k=10)

        # If not enough context, use larger chunks
        if self.needs_more_context(results):
            parent_chunks = vector_stores["size_1024"].similarity_search(query, k=5)
            results.extend(parent_chunks)

        return results

Performance Optimization

Caching Strategy

import hashlib
from functools import lru_cache
import redis

class RAGCache:
    def __init__(self):
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        self.cache_ttl = 3600  # 1 hour

    def get_cache_key(self, query: str, params: dict) -> str:
        # Create deterministic cache key
        key_string = f"{query}:{sorted(params.items())}"
        return hashlib.md5(key_string.encode()).hexdigest()

    def get_cached_response(self, query: str, params: dict):
        key = self.get_cache_key(query, params)
        cached = self.redis_client.get(key)

        if cached:
            return json.loads(cached)
        return None

    def cache_response(self, query: str, params: dict, response: dict):
        key = self.get_cache_key(query, params)
        self.redis_client.setex(
            key,
            self.cache_ttl,
            json.dumps(response)
        )

    @lru_cache(maxsize=1000)
    def get_embedding(self, text: str):
        # Cache embeddings in memory
        return self.embedding_model.embed_query(text)

Batch Processing

class BatchRAG:
    def __init__(self, vector_store, batch_size=10):
        self.vector_store = vector_store
        self.batch_size = batch_size

    async def process_queries_batch(self, queries: list):
        results = []

        for i in range(0, len(queries), self.batch_size):
            batch = queries[i:i + self.batch_size]

            # Process batch in parallel
            batch_results = await asyncio.gather(*[
                self.process_single_query(q) for q in batch
            ])

            results.extend(batch_results)

        return results

    async def process_single_query(self, query: str):
        # Async query processing
        docs = await self.vector_store.asimilarity_search(query)
        response = await self.llm.agenerate([self.build_prompt(query, docs)])

        return {
            "query": query,
            "response": response,
            "sources": [d.metadata for d in docs]
        }

Evaluation Metrics

Retrieval Quality

class RAGEvaluator:
    def evaluate_retrieval(self, test_queries, ground_truth):
        metrics = {
            "precision_at_k": [],
            "recall_at_k": [],
            "mrr": []  # Mean Reciprocal Rank
        }

        for query, true_docs in zip(test_queries, ground_truth):
            retrieved = self.retriever.get_relevant_documents(query)
            retrieved_ids = [d.metadata["doc_id"] for d in retrieved]

            # Calculate metrics
            precision = len(set(retrieved_ids) & set(true_docs)) / len(retrieved_ids)
            recall = len(set(retrieved_ids) & set(true_docs)) / len(true_docs)

            metrics["precision_at_k"].append(precision)
            metrics["recall_at_k"].append(recall)

            # MRR
            for i, doc_id in enumerate(retrieved_ids):
                if doc_id in true_docs:
                    metrics["mrr"].append(1 / (i + 1))
                    break
            else:
                metrics["mrr"].append(0)

        return {
            k: sum(v) / len(v) for k, v in metrics.items()
        }

Answer Quality

from rouge_score import rouge_scorer

class AnswerEvaluator:
    def __init__(self):
        self.rouge_scorer = rouge_scorer.RougeScorer(
            ['rouge1', 'rouge2', 'rougeL'],
            use_stemmer=True
        )

    def evaluate_answers(self, generated_answers, reference_answers):
        scores = []

        for gen, ref in zip(generated_answers, reference_answers):
            rouge_scores = self.rouge_scorer.score(ref, gen)

            scores.append({
                "rouge1": rouge_scores['rouge1'].fmeasure,
                "rouge2": rouge_scores['rouge2'].fmeasure,
                "rougeL": rouge_scores['rougeL'].fmeasure,
                "semantic_similarity": self.calculate_semantic_similarity(gen, ref)
            })

        return scores

    def calculate_semantic_similarity(self, text1, text2):
        # Use embeddings to calculate semantic similarity
        emb1 = self.embeddings.embed_query(text1)
        emb2 = self.embeddings.embed_query(text2)

        return cosine_similarity([emb1], [emb2])[0][0]

Production Considerations

Monitoring

class RAGMonitor:
    def __init__(self):
        self.metrics = {
            "query_latency": [],
            "retrieval_latency": [],
            "generation_latency": [],
            "cache_hit_rate": 0,
            "error_rate": 0
        }

    def log_query(self, query, response, latencies):
        # Track performance metrics
        self.metrics["query_latency"].append(latencies["total"])
        self.metrics["retrieval_latency"].append(latencies["retrieval"])
        self.metrics["generation_latency"].append(latencies["generation"])

        # Log to observability platform
        logger.info("RAG Query", extra={
            "query": query,
            "response_length": len(response),
            "latencies": latencies,
            "timestamp": datetime.now()
        })

    def alert_on_degradation(self):
        avg_latency = sum(self.metrics["query_latency"][-100:]) / 100

        if avg_latency > 5000:  # 5 seconds
            self.send_alert("High RAG latency detected", {
                "average_latency": avg_latency,
                "p95_latency": self.calculate_p95(self.metrics["query_latency"])
            })

Security

class SecureRAG:
    def __init__(self):
        self.content_filter = ContentFilter()
        self.rate_limiter = RateLimiter()

    def process_query(self, query: str, user_id: str):
        # Rate limiting
        if not self.rate_limiter.allow(user_id):
            raise RateLimitExceeded()

        # Input sanitization
        sanitized_query = self.sanitize_input(query)

        # Content filtering
        if self.content_filter.is_harmful(sanitized_query):
            return "I cannot process this request."

        # Process with access control
        allowed_docs = self.get_user_accessible_docs(user_id)
        response = self.rag_pipeline.query(
            sanitized_query,
            filter={"doc_id": {"$in": allowed_docs}}
        )

        # Output filtering
        return self.filter_sensitive_info(response)

Cost Optimization

Embedding Cost Reduction

class EfficientEmbeddings:
    def __init__(self):
        # Use smaller, local model for initial filtering
        self.small_model = SentenceTransformer('all-MiniLM-L6-v2')
        # Use larger model for final ranking
        self.large_model = OpenAIEmbeddings()

    def two_stage_retrieval(self, query: str, documents: list, k: int = 5):
        # Stage 1: Fast retrieval with small model
        query_emb_small = self.small_model.encode(query)
        doc_embs_small = self.small_model.encode(documents)

        # Get top 20 candidates
        similarities = cosine_similarity([query_emb_small], doc_embs_small)[0]
        top_indices = similarities.argsort()[-20:][::-1]

        # Stage 2: Rerank with large model
        candidates = [documents[i] for i in top_indices]
        query_emb_large = self.large_model.embed_query(query)
        candidate_embs = self.large_model.embed_documents(candidates)

        # Final ranking
        final_similarities = cosine_similarity([query_emb_large], candidate_embs)[0]
        final_indices = final_similarities.argsort()[-k:][::-1]

        return [candidates[i] for i in final_indices]

References


Last updated: 2025-09-09