49 KiB
Semantic and Contextual Chunking Methods
This document provides comprehensive coverage of semantic and contextual chunking approaches for advanced RAG systems.
Overview of Semantic Methods
Semantic chunking methods use understanding of text meaning and relationships to create more meaningful chunks than simple size-based approaches.
| Method | Approach | Best For | Complexity |
|---|---|---|---|
| Embedding-Based Similarity | Sentence embeddings to find boundaries | Thematic documents | High |
| Topic Modeling | LDA/NMF to identify topic segments | Mixed-topic documents | Medium |
| Named Entity Recognition | Entity-aware boundaries | Technical/medical docs | Medium |
| Dependency Parsing | Syntactic relationships | Structured content | High |
| Cross-Encoder Scoring | BERT-based boundary detection | High-precision needs | Very High |
1. Embedding-Based Semantic Chunking
Core Concept
Use sentence embeddings to identify semantic boundaries where topic shifts occur.
Advanced Implementation
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
from typing import List, Dict, Tuple
import re
class AdvancedSemanticChunker:
def __init__(self, model_name="all-MiniLM-L6-v2",
boundary_threshold=0.7,
min_chunk_size=3,
max_chunk_size=15,
clustering_method="kmeans"):
self.model = SentenceTransformer(model_name)
self.boundary_threshold = boundary_threshold
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
self.clustering_method = clustering_method
def multi_level_semantic_chunking(self, text):
"""Multi-level semantic analysis for optimal chunking"""
# Extract sentences
sentences = self._extract_sentences(text)
if len(sentences) <= self.min_chunk_size:
return [{"text": text, "method": "too_short", "level": 0}]
# Generate embeddings
embeddings = self.model.encode(sentences)
# Level 1: Similarity-based boundaries
similarity_boundaries = self._find_similarity_boundaries(embeddings)
similarity_chunks = self._create_chunks_from_boundaries(
sentences, similarity_boundaries, "similarity"
)
# Level 2: Clustering-based boundaries
clustering_boundaries = self._find_clustering_boundaries(embeddings)
clustering_chunks = self._create_chunks_from_boundaries(
sentences, clustering_boundaries, "clustering"
)
# Level 3: Hybrid approach
hybrid_boundaries = self._find_hybrid_boundaries(
similarity_boundaries, clustering_boundaries, embeddings
)
hybrid_chunks = self._create_chunks_from_boundaries(
sentences, hybrid_boundaries, "hybrid"
)
# Evaluate and select best approach
approaches = [
{"chunks": similarity_chunks, "method": "similarity"},
{"chunks": clustering_chunks, "method": "clustering"},
{"chunks": hybrid_chunks, "method": "hybrid"}
]
best_approach = self._evaluate_approaches(approaches, sentences, embeddings)
return best_approach["chunks"]
def _extract_sentences(self, text):
"""Enhanced sentence extraction"""
# Multiple sentence splitting approaches
patterns = [
r'(?<=[.!?])\s+(?=[A-Z])', # Standard sentence boundaries
r'(?<=[.!?]\s)\s*(?=[A-Z])', # Handle multiple spaces
r'(?<=[.!?])\s+(?=[a-z])', # Handle lowercase starts
]
sentences = []
for pattern in patterns:
potential_sentences = re.split(pattern, text)
potential_sentences = [s.strip() for s in potential_sentences if s.strip()]
if len(potential_sentences) > len(sentences):
sentences = potential_sentences
return sentences if sentences else [text]
def _find_similarity_boundaries(self, embeddings):
"""Find boundaries based on similarity drops"""
boundaries = []
for i in range(len(embeddings) - 1):
# Calculate similarity between consecutive sentences
similarity = cosine_similarity(
embeddings[i].reshape(1, -1),
embeddings[i + 1].reshape(1, -1)
)[0][0]
# Dynamic threshold based on local context
local_threshold = self._calculate_local_threshold(embeddings, i)
if similarity < local_threshold:
boundaries.append(i)
return self._filter_boundaries(boundaries, len(embeddings))
def _calculate_local_threshold(self, embeddings, index):
"""Calculate dynamic threshold based on local similarity patterns"""
window_size = min(5, index, len(embeddings) - index - 1)
start_idx = max(0, index - window_size)
end_idx = min(len(embeddings), index + window_size + 1)
local_embeddings = embeddings[start_idx:end_idx]
if len(local_embeddings) < 2:
return self.boundary_threshold
# Calculate local similarity statistics
similarities = []
for i in range(len(local_embeddings) - 1):
sim = cosine_similarity(
local_embeddings[i].reshape(1, -1),
local_embeddings[i + 1].reshape(1, -1)
)[0][0]
similarities.append(sim)
mean_sim = np.mean(similarities)
std_sim = np.std(similarities)
# Threshold based on local statistics
return max(0.3, mean_sim - std_sim * 0.5)
def _find_clustering_boundaries(self, embeddings):
"""Find boundaries using clustering approaches"""
if self.clustering_method == "kmeans":
return self._kmeans_boundaries(embeddings)
elif self.clustering_method == "hierarchical":
return self._hierarchical_boundaries(embeddings)
else:
return self._dbscan_boundaries(embeddings)
def _kmeans_boundaries(self, embeddings):
"""K-means clustering for boundary detection"""
n_clusters = min(max(len(embeddings) // 5, 2), 10) # Adaptive cluster count
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
labels = kmeans.fit_predict(embeddings)
boundaries = []
for i in range(len(labels) - 1):
if labels[i] != labels[i + 1]:
boundaries.append(i)
return self._filter_boundaries(boundaries, len(embeddings))
def _hierarchical_boundaries(self, embeddings):
"""Hierarchical clustering for boundary detection"""
from sklearn.cluster import AgglomerativeClustering
n_clusters = min(max(len(embeddings) // 4, 2), 8)
clustering = AgglomerativeClustering(n_clusters=n_clusters)
labels = clustering.fit_predict(embeddings)
boundaries = []
for i in range(len(labels) - 1):
if labels[i] != labels[i + 1]:
boundaries.append(i)
return self._filter_boundaries(boundaries, len(embeddings))
def _dbscan_boundaries(self, embeddings):
"""DBSCAN clustering for boundary detection"""
from sklearn.cluster import DBSCAN
# Adaptive eps based on data characteristics
distances = []
for i in range(min(10, len(embeddings))):
for j in range(i + 1, min(i + 10, len(embeddings))):
dist = cosine_similarity(
embeddings[i].reshape(1, -1),
embeddings[j].reshape(1, -1)
)[0][0]
distances.append(dist)
eps = np.percentile(distances, 25) if distances else 0.5
clustering = DBSCAN(eps=eps, min_samples=2, metric="cosine")
labels = clustering.fit_predict(embeddings)
boundaries = []
for i in range(len(labels) - 1):
if labels[i] != labels[i + 1] and labels[i] != -1 and labels[i + 1] != -1:
boundaries.append(i)
return self._filter_boundaries(boundaries, len(embeddings))
def _find_hybrid_boundaries(self, similarity_boundaries, clustering_boundaries, embeddings):
"""Combine similarity and clustering approaches"""
# Combine boundaries from both methods
all_boundaries = set(similarity_boundaries) | set(clustering_boundaries)
combined_boundaries = sorted(list(all_boundaries))
# Refine using consensus scoring
refined_boundaries = []
for boundary in combined_boundaries:
score = self._calculate_boundary_score(boundary, embeddings)
if score > 0.5: # Threshold for hybrid approach
refined_boundaries.append(boundary)
return self._filter_boundaries(refined_boundaries, len(embeddings))
def _calculate_boundary_score(self, boundary_idx, embeddings):
"""Calculate confidence score for a boundary"""
if boundary_idx >= len(embeddings) - 1:
return 0.0
# Similarity drop score
similarity = cosine_similarity(
embeddings[boundary_idx].reshape(1, -1),
embeddings[boundary_idx + 1].reshape(1, -1)
)[0][0]
similarity_score = 1.0 - similarity # Lower similarity = higher boundary score
# Local variance score
window_size = min(3, boundary_idx, len(embeddings) - boundary_idx - 1)
start_idx = max(0, boundary_idx - window_size)
end_idx = min(len(embeddings), boundary_idx + window_size + 1)
window_embeddings = embeddings[start_idx:end_idx]
if len(window_embeddings) > 1:
similarities = []
for i in range(len(window_embeddings) - 1):
sim = cosine_similarity(
window_embeddings[i].reshape(1, -1),
window_embeddings[i + 1].reshape(1, -1)
)[0][0]
similarities.append(sim)
variance_score = np.var(similarities)
else:
variance_score = 0.0
# Combined score
return (similarity_score * 0.7 + variance_score * 0.3)
def _filter_boundaries(self, boundaries, total_sentences):
"""Filter boundaries to meet size constraints"""
if not boundaries:
return boundaries
filtered_boundaries = []
last_boundary = -1
for boundary in boundaries:
chunk_size = boundary - last_boundary
# Check if chunk meets minimum size
if chunk_size >= self.min_chunk_size:
# Check if next chunk won't be too small
remaining_sentences = total_sentences - boundary - 1
if (remaining_sentences >= self.min_chunk_size or
boundary == total_sentences - 1):
filtered_boundaries.append(boundary)
last_boundary = boundary
else:
# Chunk too small, try to extend it
next_boundary = boundary + (self.min_chunk_size - chunk_size)
if next_boundary < total_sentences:
filtered_boundaries.append(next_boundary)
last_boundary = next_boundary
return filtered_boundaries
def _create_chunks_from_boundaries(self, sentences, boundaries, method):
"""Create chunks using identified boundaries"""
chunks = []
start_idx = 0
for boundary in boundaries:
if boundary > start_idx:
chunk_sentences = sentences[start_idx:boundary + 1]
chunk_text = " ".join(chunk_sentences)
chunks.append({
"text": chunk_text,
"sentence_count": len(chunk_sentences),
"start_sentence": start_idx,
"end_sentence": boundary,
"method": method
})
start_idx = boundary + 1
# Add remaining sentences
if start_idx < len(sentences):
chunk_sentences = sentences[start_idx:]
chunk_text = " ".join(chunk_sentences)
chunks.append({
"text": chunk_text,
"sentence_count": len(chunk_sentences),
"start_sentence": start_idx,
"end_sentence": len(sentences) - 1,
"method": method
})
return chunks
def _evaluate_approaches(self, approaches, sentences, embeddings):
"""Evaluate different chunking approaches"""
best_approach = approaches[0]
best_score = 0.0
for approach in approaches:
score = self._calculate_approach_score(
approach["chunks"], sentences, embeddings
)
approach["score"] = score
if score > best_score:
best_score = score
best_approach = approach
return best_approach
def _calculate_approach_score(self, chunks, sentences, embeddings):
"""Calculate quality score for a chunking approach"""
if not chunks:
return 0.0
# Coherence score (average intra-chunk similarity)
coherence_scores = []
for chunk in chunks:
start_idx = chunk["start_sentence"]
end_idx = chunk["end_sentence"]
if end_idx > start_idx:
chunk_embeddings = embeddings[start_idx:end_idx + 1]
similarities = []
for i in range(len(chunk_embeddings) - 1):
sim = cosine_similarity(
chunk_embeddings[i].reshape(1, -1),
chunk_embeddings[i + 1].reshape(1, -1)
)[0][0]
similarities.append(sim)
coherence_scores.append(np.mean(similarities) if similarities else 0.0)
coherence_score = np.mean(coherence_scores) if coherence_scores else 0.0
# Boundary score (low inter-chunk similarity)
boundary_scores = []
for i in range(len(chunks) - 1):
current_end = chunks[i]["end_sentence"]
next_start = chunks[i + 1]["start_sentence"]
if current_end < len(embeddings) and next_start < len(embeddings):
similarity = cosine_similarity(
embeddings[current_end].reshape(1, -1),
embeddings[next_start].reshape(1, -1)
)[0][0]
boundary_scores.append(1.0 - similarity) # Lower similarity = better boundary
boundary_score = np.mean(boundary_scores) if boundary_scores else 0.0
# Size appropriateness score
size_scores = []
for chunk in chunks:
sentence_count = chunk["sentence_count"]
if self.min_chunk_size <= sentence_count <= self.max_chunk_size:
size_scores.append(1.0)
else:
# Penalty for size violations
deviation = min(
abs(sentence_count - self.min_chunk_size),
abs(sentence_count - self.max_chunk_size)
)
size_scores.append(max(0.0, 1.0 - deviation / 10.0))
size_score = np.mean(size_scores) if size_scores else 0.0
# Combined score
return (
coherence_score * 0.4 +
boundary_score * 0.4 +
size_score * 0.2
)
2. Contextual Retrieval
Core Concept
Enhance each chunk with LLM-generated contextual information to improve retrieval and understanding.
Implementation
import openai
from typing import List, Dict, Optional
import json
import asyncio
class ContextualRetriever:
def __init__(self, api_key, model="gpt-3.5-turbo", max_context_length=200):
self.client = openai.OpenAI(api_key=api_key)
self.model = model
self.max_context_length = max_context_length
def contextual_chunking(self, text, base_chunker):
"""Add contextual information to chunks"""
# First, create base chunks
base_chunks = base_chunker.chunk(text)
# Generate context for each chunk
contextualized_chunks = []
total_chunks = len(base_chunks)
for i, chunk in enumerate(base_chunks):
print(f"Processing chunk {i + 1}/{total_chunks}")
# Generate contextual information
context = self._generate_context(chunk, text, i, total_chunks)
# Create contextualized chunk
contextualized_chunk = {
"original_text": chunk,
"context": context,
"contextualized_text": f"Context: {context}\n\nContent: {chunk}",
"chunk_index": i,
"total_chunks": total_chunks,
"method": "contextual_retrieval"
}
contextualized_chunks.append(contextualized_chunk)
return contextualized_chunks
def _generate_context(self, chunk, full_document, chunk_index, total_chunks):
"""Generate contextual information for a chunk"""
# Create a comprehensive prompt for context generation
prompt = self._create_context_prompt(
chunk, full_document, chunk_index, total_chunks
)
try:
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an expert at providing contextual information for document chunks. Generate concise, relevant context that helps understand the chunk's place in the larger document."},
{"role": "user", "content": prompt}
],
max_tokens=300,
temperature=0.3
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"Error generating context: {e}")
return self._generate_fallback_context(chunk, chunk_index, total_chunks)
def _create_context_prompt(self, chunk, full_document, chunk_index, total_chunks):
"""Create prompt for context generation"""
# Get surrounding context
surrounding_context = self._get_surrounding_context(
full_document, chunk, chunk_index, total_chunks
)
prompt = f"""
I need you to provide brief contextual information for a chunk from a larger document.
Document position: Chunk {chunk_index + 1} of {total_chunks}
Surrounding context:
{surrounding_context}
Current chunk:
{chunk}
Please provide a concise context (maximum {self.max_context_length} characters) that:
1. Explains where this chunk fits in the overall document
2. Mentions key topics or themes immediately before/after
3. Helps understand the chunk's purpose and relevance
4. Is written in a clear, informative style
Context:
"""
return prompt
def _get_surrounding_context(self, full_document, chunk, chunk_index, total_chunks):
"""Extract context from surrounding parts of the document"""
# Find chunk position in document
chunk_start = full_document.find(chunk)
if chunk_start == -1:
return "Context not available"
# Extract surrounding text
context_window = 500 # characters
start_pos = max(0, chunk_start - context_window)
end_pos = min(len(full_document), chunk_start + len(chunk) + context_window)
surrounding_text = full_document[start_pos:end_pos]
# Highlight the chunk position
relative_start = chunk_start - start_pos
relative_end = relative_start + len(chunk)
before_chunk = surrounding_text[:relative_start]
after_chunk = surrounding_text[relative_end:]
context_parts = []
if before_chunk.strip():
context_parts.append(f"Before: {before_chunk.strip()[-100:]}...")
if after_chunk.strip():
context_parts.append(f"After: ...{after_chunk.strip()[:100]}")
return " | ".join(context_parts)
def _generate_fallback_context(self, chunk, chunk_index, total_chunks):
"""Generate simple fallback context"""
return f"This is chunk {chunk_index + 1} of {total_chunks} in the document."
async def async_contextual_chunking(self, text, base_chunker, max_concurrent=5):
"""Asynchronous contextual chunking for better performance"""
# Create base chunks first
base_chunks = base_chunker.chunk(text)
total_chunks = len(base_chunks)
# Create semaphore to limit concurrent API calls
semaphore = asyncio.Semaphore(max_concurrent)
async def process_chunk(chunk, index):
async with semaphore:
return await self._async_generate_context(chunk, text, index, total_chunks)
# Process all chunks concurrently
tasks = [
process_chunk(chunk, i) for i, chunk in enumerate(base_chunks)
]
contextualized_chunks = await asyncio.gather(*tasks)
# Add chunk information
for i, chunk in enumerate(base_chunks):
contextualized_chunks[i]["original_text"] = chunk
contextualized_chunks[i]["chunk_index"] = i
contextualized_chunks[i]["total_chunks"] = total_chunks
contextualized_chunks[i]["method"] = "async_contextual_retrieval"
return contextualized_chunks
async def _async_generate_context(self, chunk, full_document, chunk_index, total_chunks):
"""Asynchronous context generation"""
surrounding_context = self._get_surrounding_context(
full_document, chunk, chunk_index, total_chunks
)
prompt = self._create_context_prompt(
chunk, full_document, chunk_index, total_chunks
)
try:
# Note: This would require an async OpenAI client
# For now, we'll use the synchronous version
context = self._generate_context(chunk, full_document, chunk_index, total_chunks)
return {
"context": context,
"contextualized_text": f"Context: {context}\n\nContent: {chunk}"
}
except Exception as e:
print(f"Error in async context generation: {e}")
fallback_context = self._generate_fallback_context(chunk, chunk_index, total_chunks)
return {
"context": fallback_context,
"contextualized_text": f"Context: {fallback_context}\n\nContent: {chunk}"
}
def hierarchical_contextual_chunking(self, text, base_chunker):
"""Hierarchical contextual chunking with multiple context levels"""
# Create base chunks
base_chunks = base_chunker.chunk(text)
# Level 1: Document-level context
document_summary = self._generate_document_summary(text)
# Level 2: Section-level context
section_contexts = self._generate_section_contexts(text, base_chunks)
# Level 3: Local context for each chunk
contextualized_chunks = []
for i, chunk in enumerate(base_chunks):
local_context = self._generate_context(chunk, text, i, len(base_chunks))
# Combine all context levels
combined_context = f"""
Document Overview: {document_summary}
Section Context: {section_contexts.get(i, "Section context not available")}
Local Context: {local_context}
"""
contextualized_chunk = {
"original_text": chunk,
"document_context": document_summary,
"section_context": section_contexts.get(i, ""),
"local_context": local_context,
"combined_context": combined_context.strip(),
"contextualized_text": f"Context: {combined_context.strip()}\n\nContent: {chunk}",
"chunk_index": i,
"method": "hierarchical_contextual"
}
contextualized_chunks.append(contextualized_chunk)
return contextualized_chunks
def _generate_document_summary(self, text):
"""Generate a summary of the entire document"""
try:
prompt = f"""
Please provide a brief summary (maximum 100 words) of this document:
{text[:1000]}...
"""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an expert at summarizing documents concisely."},
{"role": "user", "content": prompt}
],
max_tokens=150,
temperature=0.3
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"Error generating document summary: {e}")
return "Document summary not available"
def _generate_section_contexts(self, text, chunks):
"""Generate context for different sections of the document"""
section_contexts = {}
# Simple section detection based on position
total_chunks = len(chunks)
sections = 3 # Divide document into 3 sections
for i, chunk in enumerate(chunks):
section_number = min(i // (total_chunks // sections), sections - 1)
if section_number not in section_contexts:
section_contexts[section_number] = self._generate_section_context(
text, section_number, sections
)
return {i: section_contexts.get(i // (total_chunks // sections), "")
for i in range(total_chunks)}
def _generate_section_context(self, text, section_number, total_sections):
"""Generate context for a specific section"""
try:
section_size = len(text) // total_sections
start_pos = section_number * section_size
end_pos = min((section_number + 1) * section_size, len(text))
section_text = text[start_pos:end_pos]
prompt = f"""
Provide brief context for section {section_number + 1} of {total_sections} in this document:
Section text:
{section_text[:500]}...
Context (maximum 50 words):
"""
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an expert at providing document section context."},
{"role": "user", "content": prompt}
],
max_tokens=100,
temperature=0.3
)
return response.choices[0].message.content.strip()
except Exception as e:
print(f"Error generating section context: {e}")
return f"This is section {section_number + 1} of the document"
3. Late Chunking
Core Concept
Generate embeddings for the entire document first, then create chunk embeddings from token-level embeddings.
Implementation
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModel
from typing import List, Dict, Tuple
import logging
class LateChunker:
def __init__(self, model_name="sentence-transformers/all-MiniLM-L6-v2",
device="cpu", chunk_size=512):
self.device = device
self.chunk_size = chunk_size
# Load tokenizer and model
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModel.from_pretrained(model_name)
self.model.to(device)
self.model.eval()
def late_chunk_embedding(self, text, chunk_sizes=None):
"""Generate late chunk embeddings"""
if chunk_sizes is None:
chunk_sizes = [256, 512, 1024] # Multiple chunk sizes to try
results = {}
for chunk_size in chunk_sizes:
# Tokenize entire document
encoded = self.tokenizer(
text,
return_tensors="pt",
truncation=False,
padding=False
).to(self.device)
input_ids = encoded["input_ids"][0]
attention_mask = encoded["attention_mask"][0]
# Generate embeddings for all tokens
with torch.no_grad():
outputs = self.model(**encoded, output_hidden_states=True)
# Use last hidden state
token_embeddings = outputs.last_hidden_state[0] # Shape: [seq_len, hidden_dim]
# Create chunks from token embeddings
chunk_embeddings = self._create_chunk_embeddings(
token_embeddings, attention_mask, chunk_size, input_ids
)
# Create chunk texts
chunk_texts = self._create_chunk_texts(input_ids, chunk_size)
results[chunk_size] = {
"chunk_embeddings": chunk_embeddings,
"chunk_texts": chunk_texts,
"chunk_size": chunk_size,
"method": "late_chunking"
}
return results
def _create_chunk_embeddings(self, token_embeddings, attention_mask, chunk_size, input_ids):
"""Create chunk embeddings from token embeddings"""
chunk_embeddings = []
valid_token_indices = torch.where(attention_mask == 1)[0]
for i in range(0, len(valid_token_indices), chunk_size):
end_idx = min(i + chunk_size, len(valid_token_indices))
chunk_token_indices = valid_token_indices[i:end_idx]
# Get embeddings for tokens in this chunk
chunk_token_embeddings = token_embeddings[chunk_token_indices]
# Pool token embeddings to create chunk embedding
# Using mean pooling
chunk_embedding = torch.mean(chunk_token_embeddings, dim=0)
chunk_embeddings.append(chunk_embedding.cpu().numpy())
return chunk_embeddings
def _create_chunk_texts(self, input_ids, chunk_size):
"""Create text for each chunk"""
chunk_texts = []
total_tokens = input_ids.shape[0]
for i in range(0, total_tokens, chunk_size):
end_idx = min(i + chunk_size, total_tokens)
chunk_ids = input_ids[i:end_idx]
chunk_text = self.tokenizer.decode(chunk_ids, skip_special_tokens=True)
chunk_texts.append(chunk_text)
return chunk_texts
def adaptive_late_chunking(self, text, complexity_analyzer=None):
"""Adaptive late chunking based on document complexity"""
# Analyze document complexity
if complexity_analyzer:
complexity = complexity_analyzer.analyze_complexity(text)
else:
complexity = self._simple_complexity_analysis(text)
# Adjust chunk size based on complexity
if complexity < 0.3: # Simple document
chunk_sizes = [512, 1024]
elif complexity < 0.7: # Medium complexity
chunk_sizes = [256, 512, 1024]
else: # Complex document
chunk_sizes = [128, 256, 512]
results = self.late_chunk_embedding(text, chunk_sizes)
# Evaluate and select best chunk size
best_chunk_size = self._evaluate_chunk_sizes(results, complexity)
return {
"best_results": results[best_chunk_size],
"all_results": results,
"selected_chunk_size": best_chunk_size,
"complexity": complexity,
"method": "adaptive_late_chunking"
}
def _simple_complexity_analysis(self, text):
"""Simple complexity analysis"""
# Factors: average sentence length, vocabulary diversity, punctuation complexity
sentences = text.split('.')
if not sentences:
return 0.0
avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences)
unique_words = len(set(text.lower().split()))
total_words = len(text.split())
vocab_diversity = unique_words / total_words if total_words > 0 else 0
# Normalize and combine
length_score = min(avg_sentence_length / 20, 1.0) # Normalize to 0-1
diversity_score = vocab_diversity
complexity = (length_score + diversity_score) / 2
return complexity
def _evaluate_chunk_sizes(self, results, complexity):
"""Evaluate different chunk sizes and select the best one"""
best_chunk_size = list(results.keys())[0]
best_score = 0.0
for chunk_size, result in results.items():
score = self._calculate_chunking_score(result, complexity)
result["score"] = score
if score > best_score:
best_score = score
best_chunk_size = chunk_size
return best_chunk_size
def _calculate_chunking_score(self, result, complexity):
"""Calculate quality score for a chunking result"""
chunk_texts = result["chunk_texts"]
chunk_embeddings = result["chunk_embeddings"]
if not chunk_texts or not chunk_embeddings:
return 0.0
# Factors to consider:
# 1. Number of chunks (moderate number is better)
# 2. Chunk size consistency
# 3. Content preservation (estimated)
num_chunks = len(chunk_texts)
chunk_lengths = [len(text.split()) for text in chunk_texts]
# Score based on optimal number of chunks (5-15 is ideal)
if 5 <= num_chunks <= 15:
chunk_count_score = 1.0
elif num_chunks < 5:
chunk_count_score = num_chunks / 5.0
else:
chunk_count_score = max(0.0, 1.0 - (num_chunks - 15) / 20.0)
# Score based on size consistency
if chunk_lengths:
mean_length = np.mean(chunk_lengths)
std_length = np.std(chunk_lengths)
consistency_score = max(0.0, 1.0 - (std_length / mean_length))
else:
consistency_score = 0.0
# Adjust score based on document complexity
complexity_adjustment = 0.5 + complexity * 0.5
total_score = (
chunk_count_score * 0.4 +
consistency_score * 0.3 +
complexity_adjustment * 0.3
)
return total_score
def contextual_late_chunking(self, text, context_generator=None):
"""Combine late chunking with contextual information"""
# Generate late chunk embeddings
late_results = self.late_chunk_embedding(text)
# Use best chunk size (default to 512)
best_chunk_size = 512
if best_chunk_size in late_results:
result = late_results[best_chunk_size]
else:
result = list(late_results.values())[0]
chunk_texts = result["chunk_texts"]
chunk_embeddings = result["chunk_embeddings"]
# Add contextual information if available
if context_generator:
contextualized_chunks = []
for i, chunk_text in enumerate(chunk_texts):
context = context_generator.generate_context(
chunk_text, text, i, len(chunk_texts)
)
contextualized_chunk = {
"text": chunk_text,
"embedding": chunk_embeddings[i],
"context": context,
"contextualized_text": f"Context: {context}\n\nContent: {chunk_text}",
"chunk_index": i,
"method": "contextual_late_chunking"
}
contextualized_chunks.append(contextualized_chunk)
return {
"chunks": contextualized_chunks,
"chunk_size": best_chunk_size,
"method": "contextual_late_chunking"
}
# Return without context
chunks = []
for i, (chunk_text, embedding) in enumerate(zip(chunk_texts, chunk_embeddings)):
chunks.append({
"text": chunk_text,
"embedding": embedding,
"chunk_index": i,
"method": "late_chunking"
})
return {
"chunks": chunks,
"chunk_size": best_chunk_size,
"method": "late_chunking"
}
def semantic_late_chunking(self, text, semantic_model=None):
"""Combine late chunking with semantic boundary detection"""
# Generate late chunks
late_results = self.late_chunk_embedding(text)
best_chunk_size = 512
if best_chunk_size in late_results:
result = late_results[best_chunk_size]
else:
result = list(late_results.values())[0]
chunk_texts = result["chunk_texts"]
chunk_embeddings = result["chunk_embeddings"]
# Use semantic model to analyze chunk boundaries
if semantic_model:
# Analyze semantic coherence between chunks
semantic_chunks = self._analyze_semantic_boundaries(
chunk_texts, chunk_embeddings, semantic_model
)
else:
# Simple semantic analysis using the embeddings we already have
semantic_chunks = self._simple_semantic_analysis(
chunk_texts, chunk_embeddings
)
return {
"chunks": semantic_chunks,
"chunk_size": best_chunk_size,
"method": "semantic_late_chunking"
}
def _analyze_semantic_boundaries(self, chunk_texts, chunk_embeddings, semantic_model):
"""Analyze semantic boundaries using external semantic model"""
# This would use a semantic model to analyze boundaries
# For now, return chunks with similarity information
chunks = []
for i, (text, embedding) in enumerate(zip(chunk_texts, chunk_embeddings)):
chunk_data = {
"text": text,
"embedding": embedding,
"chunk_index": i,
"method": "semantic_late_chunking"
}
# Calculate similarity with adjacent chunks
if i > 0:
prev_embedding = chunk_embeddings[i - 1]
similarity = np.dot(embedding, prev_embedding) / (
np.linalg.norm(embedding) * np.linalg.norm(prev_embedding)
)
chunk_data["similarity_with_previous"] = similarity
if i < len(chunk_texts) - 1:
next_embedding = chunk_embeddings[i + 1]
similarity = np.dot(embedding, next_embedding) / (
np.linalg.norm(embedding) * np.linalg.norm(next_embedding)
)
chunk_data["similarity_with_next"] = similarity
chunks.append(chunk_data)
return chunks
def _simple_semantic_analysis(self, chunk_texts, chunk_embeddings):
"""Simple semantic analysis using available embeddings"""
chunks = []
for i, (text, embedding) in enumerate(zip(chunk_texts, chunk_embeddings)):
chunk_data = {
"text": text,
"embedding": embedding,
"chunk_index": i,
"method": "simple_semantic_late_chunking"
}
# Calculate similarity with adjacent chunks
if i > 0:
prev_embedding = chunk_embeddings[i - 1]
similarity = np.dot(embedding, prev_embedding) / (
np.linalg.norm(embedding) * np.linalg.norm(prev_embedding)
)
chunk_data["similarity_with_previous"] = similarity
if i < len(chunk_texts) - 1:
next_embedding = chunk_embeddings[i + 1]
similarity = np.dot(embedding, next_embedding) / (
np.linalg.norm(embedding) * np.linalg.norm(next_embedding)
)
chunk_data["similarity_with_next"] = similarity
chunks.append(chunk_data)
return chunks
4. Usage Examples and Integration
Complete Semantic Chunking Pipeline
class SemanticChunkingPipeline:
def __init__(self, config):
self.config = config
self.semantic_chunker = AdvancedSemanticChunker(
model_name=config.get("semantic_model", "all-MiniLM-L6-v2"),
boundary_threshold=config.get("boundary_threshold", 0.7)
)
self.contextual_retriever = ContextualRetriever(
api_key=config.get("openai_api_key"),
model=config.get("context_model", "gpt-3.5-turbo")
)
self.late_chunker = LateChunker(
model_name=config.get("embedding_model", "sentence-transformers/all-MiniLM-L6-v2")
)
def process_document(self, text, method="hybrid"):
"""Process document using specified semantic method"""
if method == "semantic":
return self.semantic_chunker.multi_level_semantic_chunking(text)
elif method == "contextual":
base_chunker = FixedSizeChunker(chunk_size=512, chunk_overlap=50)
return self.contextual_retriever.contextual_chunking(text, base_chunker)
elif method == "late":
return self.late_chunker.adaptive_late_chunking(text)
elif method == "hybrid":
# Combine multiple approaches
semantic_chunks = self.semantic_chunker.multi_level_semantic_chunking(text)
# Add contextual information to semantic chunks
contextualized_chunks = []
for i, chunk in enumerate(semantic_chunks):
context = self._generate_simple_context(chunk["text"], text, i)
contextualized_chunk = chunk.copy()
contextualized_chunk.update({
"context": context,
"contextualized_text": f"Context: {context}\n\nContent: {chunk['text']}",
"method": "hybrid_semantic_contextual"
})
contextualized_chunks.append(contextualized_chunk)
return contextualized_chunks
else:
raise ValueError(f"Unknown method: {method}")
def _generate_simple_context(self, chunk, full_text, chunk_index):
"""Generate simple context without API calls"""
# Find chunk position
chunk_start = full_text.find(chunk)
if chunk_start == -1:
return f"Chunk {chunk_index + 1}"
# Extract surrounding text
context_window = 200
start_pos = max(0, chunk_start - context_window)
end_pos = min(len(full_text), chunk_start + len(chunk) + context_window)
surrounding = full_text[start_pos:end_pos]
return f"Part of larger document (position {chunk_start}-{chunk_start + len(chunk)})"
def evaluate_chunking(self, chunks, text):
"""Evaluate chunking quality"""
if not chunks:
return {"overall_score": 0.0, "metrics": {}}
# Generate embeddings for evaluation
sentences = self._extract_sentences(text)
sentence_embeddings = self.semantic_chunker.model.encode(sentences)
# Calculate metrics
metrics = {
"coherence": self._calculate_coherence(chunks, sentence_embeddings),
"coverage": self._calculate_coverage(chunks, text),
"size_distribution": self._analyze_size_distribution(chunks),
"boundary_quality": self._assess_boundary_quality(chunks)
}
# Calculate overall score
weights = {"coherence": 0.3, "coverage": 0.3, "size_distribution": 0.2, "boundary_quality": 0.2}
overall_score = sum(metrics[metric] * weights[metric] for metric in weights)
return {
"overall_score": overall_score,
"metrics": metrics,
"chunks_count": len(chunks)
}
def _extract_sentences(self, text):
"""Extract sentences for evaluation"""
import re
sentences = re.split(r'[.!?]+', text)
return [s.strip() for s in sentences if s.strip()]
def _calculate_coherence(self, chunks, sentence_embeddings):
"""Calculate semantic coherence of chunks"""
coherence_scores = []
for chunk in chunks:
chunk_text = chunk.get("text", chunk.get("original_text", ""))
chunk_sentences = self._extract_sentences(chunk_text)
if len(chunk_sentences) > 1:
# Find sentence indices
sentence_indices = []
for sentence in chunk_sentences:
# Simple matching - could be improved
for i, sent in enumerate(self._extract_sentences(" ".join([s for s in chunk_sentences]))):
if sentence.strip() == sent.strip():
sentence_indices.append(i)
break
if len(sentence_indices) > 1:
# Calculate average similarity between consecutive sentences
similarities = []
for i in range(len(sentence_indices) - 1):
idx1, idx2 = sentence_indices[i], sentence_indices[i + 1]
if idx1 < len(sentence_embeddings) and idx2 < len(sentence_embeddings):
sim = cosine_similarity(
sentence_embeddings[idx1].reshape(1, -1),
sentence_embeddings[idx2].reshape(1, -1)
)[0][0]
similarities.append(sim)
if similarities:
coherence_scores.append(np.mean(similarities))
return np.mean(coherence_scores) if coherence_scores else 0.0
def _calculate_coverage(self, chunks, original_text):
"""Calculate how well chunks cover the original text"""
combined_chunk_text = " ".join([
chunk.get("text", chunk.get("original_text", "")) for chunk in chunks
])
# Simple coverage based on text overlap
original_words = set(original_text.lower().split())
chunk_words = set(combined_chunk_text.lower().split())
if not original_words:
return 0.0
coverage = len(original_words & chunk_words) / len(original_words)
return coverage
def _analyze_size_distribution(self, chunks):
"""Analyze the distribution of chunk sizes"""
sizes = [len(chunk.get("text", chunk.get("original_text", "")).split())
for chunk in chunks]
if not sizes:
return 0.0
mean_size = np.mean(sizes)
std_size = np.std(sizes)
# Ideal distribution has low variance
consistency_score = max(0.0, 1.0 - (std_size / mean_size))
return consistency_score
def _assess_boundary_quality(self, chunks):
"""Assess the quality of chunk boundaries"""
if len(chunks) < 2:
return 1.0
boundary_scores = []
for i in range(len(chunks) - 1):
current_chunk = chunks[i].get("text", chunks[i].get("original_text", ""))
next_chunk = chunks[i + 1].get("text", chunks[i + 1].get("original_text", ""))
# Check if chunks end/begin naturally
ends_naturally = current_chunk.strip().endswith(('.', '!', '?', ':', ';'))
begins_naturally = next_chunk.strip()[0].isupper() if next_chunk.strip() else False
boundary_score = 0.0
if ends_naturally:
boundary_score += 0.5
if begins_naturally:
boundary_score += 0.5
boundary_scores.append(boundary_score)
return np.mean(boundary_scores) if boundary_scores else 0.0
# Usage Example
if __name__ == "__main__":
config = {
"semantic_model": "all-MiniLM-L6-v2",
"context_model": "gpt-3.5-turbo",
"embedding_model": "sentence-transformers/all-MiniLM-L6-v2",
"openai_api_key": "your-api-key-here"
}
pipeline = SemanticChunkingPipeline(config)
# Sample document
sample_text = """
Natural language processing has evolved significantly over the past decade.
Modern transformer models have revolutionized how we approach text understanding and generation.
These models use attention mechanisms to process input text in parallel.
The attention mechanism allows the model to focus on different parts of the input when producing each part of the output.
Retrieval-Augmented Generation (RAG) combines the power of large language models with external knowledge retrieval.
This approach enables models to access up-to-date information beyond their training data.
RAG systems typically consist of three main components: a retriever, a knowledge base, and a generator.
The retriever finds relevant documents from the knowledge base based on the user's query.
The generator then uses these retrieved documents to produce a more informed response.
"""
# Process with different methods
semantic_chunks = pipeline.process_document(sample_text, method="semantic")
contextual_chunks = pipeline.process_document(sample_text, method="contextual")
late_chunks = pipeline.process_document(sample_text, method="late")
hybrid_chunks = pipeline.process_document(sample_text, method="hybrid")
# Evaluate results
methods = {
"semantic": semantic_chunks,
"contextual": contextual_chunks,
"late": late_chunks,
"hybrid": hybrid_chunks
}
for method_name, chunks in methods.items():
evaluation = pipeline.evaluate_chunking(chunks, sample_text)
print(f"\n{method_name.upper()} Method:")
print(f" Chunks: {len(chunks)}")
print(f" Overall Score: {evaluation['overall_score']:.3f}")
print(f" Coherence: {evaluation['metrics']['coherence']:.3f}")
print(f" Coverage: {evaluation['metrics']['coverage']:.3f}")
These semantic and contextual chunking methods provide advanced approaches for creating meaningful, context-aware chunks that significantly improve RAG system performance compared to traditional size-based chunking methods.