RAG (Retrieval Augmented Generation) Overview¶
What is RAG?¶
RAG combines the power of large language models with external knowledge retrieval to provide accurate, up-to-date, and contextual responses. Instead of relying solely on the model's training data, RAG systems fetch relevant information from a knowledge base at query time.
Architecture Components¶
graph LR
subgraph "Ingestion Pipeline"
DOCS[Documents] --> LOAD[Document Loader]
LOAD --> SPLIT[Text Splitter]
SPLIT --> EMBED1[Embedding Model]
EMBED1 --> VECTOR[(Vector DB)]
end
subgraph "Query Pipeline"
QUERY[User Query] --> EMBED2[Embedding Model]
EMBED2 --> SEARCH[Similarity Search]
VECTOR --> SEARCH
SEARCH --> CONTEXT[Retrieved Context]
CONTEXT --> PROMPT[Prompt Construction]
QUERY --> PROMPT
PROMPT --> LLM[LLM]
LLM --> RESPONSE[Response]
end
Implementation Example¶
Document Ingestion¶
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Qdrant
import qdrant_client
class RAGIngestionPipeline:
def __init__(self, collection_name="easychamp_docs"):
self.embeddings = OpenAIEmbeddings()
self.client = qdrant_client.QdrantClient(host="localhost", port=6333)
self.collection_name = collection_name
def ingest_documents(self, directory_path: str):
# Load documents
loader = DirectoryLoader(
directory_path,
glob="**/*.md", # Load markdown files
show_progress=True
)
documents = loader.load()
# Split into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
chunks = text_splitter.split_documents(documents)
# Add metadata
for i, chunk in enumerate(chunks):
chunk.metadata.update({
"chunk_id": i,
"source_file": chunk.metadata.get("source", "unknown"),
"chunk_size": len(chunk.page_content)
})
# Create vector store
vector_store = Qdrant.from_documents(
documents=chunks,
embedding=self.embeddings,
url="http://localhost:6333",
collection_name=self.collection_name,
force_recreate=True
)
return vector_store
Query Pipeline¶
from langchain.chains import RetrievalQA
from langchain.llms import Anthropic
from langchain.prompts import PromptTemplate
class RAGQueryPipeline:
def __init__(self, vector_store):
self.vector_store = vector_store
self.llm = Anthropic(model="claude-3-opus-20240229")
self.setup_chain()
def setup_chain(self):
# Custom prompt template
prompt_template = """You are an AI assistant for EasyChamp documentation.
Use the following context to answer the question. If you don't know the answer,
say so - don't make up information.
Context:
{context}
Question: {question}
Answer: """
PROMPT = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
# Create retrieval chain
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.vector_store.as_retriever(
search_type="similarity",
search_kwargs={"k": 5} # Retrieve top 5 chunks
),
chain_type_kwargs={"prompt": PROMPT},
return_source_documents=True
)
def query(self, question: str):
result = self.qa_chain({"query": question})
return {
"answer": result["result"],
"sources": [doc.metadata["source"] for doc in result["source_documents"]],
"confidence": self.calculate_confidence(result)
}
def calculate_confidence(self, result):
# Simple confidence based on source relevance
if len(result["source_documents"]) == 0:
return 0.0
# Average similarity scores
scores = [doc.metadata.get("score", 0.5) for doc in result["source_documents"]]
return sum(scores) / len(scores)
Advanced Techniques¶
Hybrid Search¶
Combine vector similarity with keyword search.
class HybridSearch:
def __init__(self, vector_store, bm25_index):
self.vector_store = vector_store
self.bm25_index = bm25_index
def search(self, query: str, k: int = 5, alpha: float = 0.5):
# Vector search
vector_results = self.vector_store.similarity_search_with_score(
query, k=k*2
)
# Keyword search (BM25)
keyword_results = self.bm25_index.search(query, k=k*2)
# Combine and rerank
combined = self.reciprocal_rank_fusion(
vector_results,
keyword_results,
alpha=alpha
)
return combined[:k]
def reciprocal_rank_fusion(self, vector_results, keyword_results, alpha=0.5):
# RRF algorithm for combining rankings
scores = {}
for rank, (doc, score) in enumerate(vector_results):
doc_id = doc.metadata["chunk_id"]
scores[doc_id] = scores.get(doc_id, 0) + alpha / (rank + 1)
for rank, doc in enumerate(keyword_results):
doc_id = doc["chunk_id"]
scores[doc_id] = scores.get(doc_id, 0) + (1-alpha) / (rank + 1)
# Sort by combined score
sorted_docs = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return sorted_docs
Contextual Compression¶
Reduce retrieved context to relevant parts.
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
class CompressedRAG:
def __init__(self, vector_store, llm):
self.base_retriever = vector_store.as_retriever()
# Setup compressor
compressor = LLMChainExtractor.from_llm(llm)
self.retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=self.base_retriever
)
def retrieve_compressed(self, query: str):
# Returns only relevant portions of documents
compressed_docs = self.retriever.get_relevant_documents(query)
return compressed_docs
Multi-Query Retrieval¶
Generate multiple queries for better coverage.
class MultiQueryRAG:
def __init__(self, vector_store, llm):
self.vector_store = vector_store
self.llm = llm
def generate_queries(self, original_query: str) -> list:
prompt = f"""Generate 3 different versions of this question to retrieve relevant documents:
Original question: {original_query}
Alternative questions:
1."""
response = self.llm.invoke(prompt)
# Parse generated queries
queries = [original_query]
queries.extend(self.parse_queries(response))
return queries
def retrieve_multi_query(self, query: str, k: int = 5):
# Generate alternative queries
queries = self.generate_queries(query)
# Retrieve for each query
all_docs = []
seen_ids = set()
for q in queries:
docs = self.vector_store.similarity_search(q, k=k)
for doc in docs:
doc_id = doc.metadata.get("chunk_id")
if doc_id not in seen_ids:
all_docs.append(doc)
seen_ids.add(doc_id)
# Rerank based on relevance to original query
return self.rerank_documents(all_docs, query)[:k]
Chunking Strategies¶
Semantic Chunking¶
class SemanticChunker:
def __init__(self, embeddings, threshold=0.8):
self.embeddings = embeddings
self.threshold = threshold
def chunk_document(self, text: str) -> list:
sentences = text.split('. ')
chunks = []
current_chunk = []
for i, sentence in enumerate(sentences):
if not current_chunk:
current_chunk.append(sentence)
continue
# Check semantic similarity
chunk_embedding = self.embeddings.embed_query(' '.join(current_chunk))
sentence_embedding = self.embeddings.embed_query(sentence)
similarity = self.cosine_similarity(chunk_embedding, sentence_embedding)
if similarity > self.threshold:
current_chunk.append(sentence)
else:
# Start new chunk
chunks.append(' '.join(current_chunk))
current_chunk = [sentence]
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
Hierarchical Chunking¶
class HierarchicalChunker:
def __init__(self):
self.chunk_sizes = [512, 1024, 2048] # Multiple granularities
def create_chunks(self, document: str) -> dict:
chunks = {}
for size in self.chunk_sizes:
splitter = RecursiveCharacterTextSplitter(
chunk_size=size,
chunk_overlap=size // 10
)
chunks[f"size_{size}"] = splitter.split_text(document)
return chunks
def retrieve_hierarchical(self, query: str, vector_stores: dict):
# Start with smallest chunks for precision
results = vector_stores["size_512"].similarity_search(query, k=10)
# If not enough context, use larger chunks
if self.needs_more_context(results):
parent_chunks = vector_stores["size_1024"].similarity_search(query, k=5)
results.extend(parent_chunks)
return results
Performance Optimization¶
Caching Strategy¶
import hashlib
from functools import lru_cache
import redis
class RAGCache:
def __init__(self):
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
self.cache_ttl = 3600 # 1 hour
def get_cache_key(self, query: str, params: dict) -> str:
# Create deterministic cache key
key_string = f"{query}:{sorted(params.items())}"
return hashlib.md5(key_string.encode()).hexdigest()
def get_cached_response(self, query: str, params: dict):
key = self.get_cache_key(query, params)
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
return None
def cache_response(self, query: str, params: dict, response: dict):
key = self.get_cache_key(query, params)
self.redis_client.setex(
key,
self.cache_ttl,
json.dumps(response)
)
@lru_cache(maxsize=1000)
def get_embedding(self, text: str):
# Cache embeddings in memory
return self.embedding_model.embed_query(text)
Batch Processing¶
class BatchRAG:
def __init__(self, vector_store, batch_size=10):
self.vector_store = vector_store
self.batch_size = batch_size
async def process_queries_batch(self, queries: list):
results = []
for i in range(0, len(queries), self.batch_size):
batch = queries[i:i + self.batch_size]
# Process batch in parallel
batch_results = await asyncio.gather(*[
self.process_single_query(q) for q in batch
])
results.extend(batch_results)
return results
async def process_single_query(self, query: str):
# Async query processing
docs = await self.vector_store.asimilarity_search(query)
response = await self.llm.agenerate([self.build_prompt(query, docs)])
return {
"query": query,
"response": response,
"sources": [d.metadata for d in docs]
}
Evaluation Metrics¶
Retrieval Quality¶
class RAGEvaluator:
def evaluate_retrieval(self, test_queries, ground_truth):
metrics = {
"precision_at_k": [],
"recall_at_k": [],
"mrr": [] # Mean Reciprocal Rank
}
for query, true_docs in zip(test_queries, ground_truth):
retrieved = self.retriever.get_relevant_documents(query)
retrieved_ids = [d.metadata["doc_id"] for d in retrieved]
# Calculate metrics
precision = len(set(retrieved_ids) & set(true_docs)) / len(retrieved_ids)
recall = len(set(retrieved_ids) & set(true_docs)) / len(true_docs)
metrics["precision_at_k"].append(precision)
metrics["recall_at_k"].append(recall)
# MRR
for i, doc_id in enumerate(retrieved_ids):
if doc_id in true_docs:
metrics["mrr"].append(1 / (i + 1))
break
else:
metrics["mrr"].append(0)
return {
k: sum(v) / len(v) for k, v in metrics.items()
}
Answer Quality¶
from rouge_score import rouge_scorer
class AnswerEvaluator:
def __init__(self):
self.rouge_scorer = rouge_scorer.RougeScorer(
['rouge1', 'rouge2', 'rougeL'],
use_stemmer=True
)
def evaluate_answers(self, generated_answers, reference_answers):
scores = []
for gen, ref in zip(generated_answers, reference_answers):
rouge_scores = self.rouge_scorer.score(ref, gen)
scores.append({
"rouge1": rouge_scores['rouge1'].fmeasure,
"rouge2": rouge_scores['rouge2'].fmeasure,
"rougeL": rouge_scores['rougeL'].fmeasure,
"semantic_similarity": self.calculate_semantic_similarity(gen, ref)
})
return scores
def calculate_semantic_similarity(self, text1, text2):
# Use embeddings to calculate semantic similarity
emb1 = self.embeddings.embed_query(text1)
emb2 = self.embeddings.embed_query(text2)
return cosine_similarity([emb1], [emb2])[0][0]
Production Considerations¶
Monitoring¶
class RAGMonitor:
def __init__(self):
self.metrics = {
"query_latency": [],
"retrieval_latency": [],
"generation_latency": [],
"cache_hit_rate": 0,
"error_rate": 0
}
def log_query(self, query, response, latencies):
# Track performance metrics
self.metrics["query_latency"].append(latencies["total"])
self.metrics["retrieval_latency"].append(latencies["retrieval"])
self.metrics["generation_latency"].append(latencies["generation"])
# Log to observability platform
logger.info("RAG Query", extra={
"query": query,
"response_length": len(response),
"latencies": latencies,
"timestamp": datetime.now()
})
def alert_on_degradation(self):
avg_latency = sum(self.metrics["query_latency"][-100:]) / 100
if avg_latency > 5000: # 5 seconds
self.send_alert("High RAG latency detected", {
"average_latency": avg_latency,
"p95_latency": self.calculate_p95(self.metrics["query_latency"])
})
Security¶
class SecureRAG:
def __init__(self):
self.content_filter = ContentFilter()
self.rate_limiter = RateLimiter()
def process_query(self, query: str, user_id: str):
# Rate limiting
if not self.rate_limiter.allow(user_id):
raise RateLimitExceeded()
# Input sanitization
sanitized_query = self.sanitize_input(query)
# Content filtering
if self.content_filter.is_harmful(sanitized_query):
return "I cannot process this request."
# Process with access control
allowed_docs = self.get_user_accessible_docs(user_id)
response = self.rag_pipeline.query(
sanitized_query,
filter={"doc_id": {"$in": allowed_docs}}
)
# Output filtering
return self.filter_sensitive_info(response)
Cost Optimization¶
Embedding Cost Reduction¶
class EfficientEmbeddings:
def __init__(self):
# Use smaller, local model for initial filtering
self.small_model = SentenceTransformer('all-MiniLM-L6-v2')
# Use larger model for final ranking
self.large_model = OpenAIEmbeddings()
def two_stage_retrieval(self, query: str, documents: list, k: int = 5):
# Stage 1: Fast retrieval with small model
query_emb_small = self.small_model.encode(query)
doc_embs_small = self.small_model.encode(documents)
# Get top 20 candidates
similarities = cosine_similarity([query_emb_small], doc_embs_small)[0]
top_indices = similarities.argsort()[-20:][::-1]
# Stage 2: Rerank with large model
candidates = [documents[i] for i in top_indices]
query_emb_large = self.large_model.embed_query(query)
candidate_embs = self.large_model.embed_documents(candidates)
# Final ranking
final_similarities = cosine_similarity([query_emb_large], candidate_embs)[0]
final_indices = final_similarities.argsort()[-k:][::-1]
return [candidates[i] for i in final_indices]
References¶
Last updated: 2025-09-09