25 KiB
25 KiB
Complete Implementation Guidelines
This document provides comprehensive implementation guidance for building effective chunking systems.
System Architecture
Core Components
Document Processor
├── Ingestion Layer
│ ├── Document Type Detection
│ ├── Format Parsing (PDF, HTML, Markdown, etc.)
│ └── Content Extraction
├── Analysis Layer
│ ├── Structure Analysis
│ ├── Content Type Identification
│ └── Complexity Assessment
├── Strategy Selection Layer
│ ├── Rule-based Selection
│ ├── ML-based Prediction
│ └── Adaptive Configuration
├── Chunking Layer
│ ├── Strategy Implementation
│ ├── Parameter Optimization
│ └── Quality Validation
└── Output Layer
├── Chunk Metadata Generation
├── Embedding Integration
└── Storage Preparation
Pre-processing Pipeline
Document Analysis Framework
from dataclasses import dataclass
from typing import List, Dict, Any
import re
@dataclass
class DocumentAnalysis:
doc_type: str
structure_score: float # 0-1, higher means more structured
complexity_score: float # 0-1, higher means more complex
content_types: List[str]
language: str
estimated_tokens: int
has_multimodal: bool
class DocumentAnalyzer:
def __init__(self):
self.structure_patterns = {
'markdown': [r'^#+\s', r'^\*\*.*\*\*$', r'^\* ', r'^\d+\. '],
'html': [r'<h[1-6]>', r'<p>', r'<div>', r'<table>'],
'latex': [r'\\section', r'\\subsection', r'\\begin\{', r'\\end\{'],
'academic': [r'^\d+\.', r'^\d+\.\d+', r'^[A-Z]\.', r'^Figure \d+']
}
def analyze(self, content: str) -> DocumentAnalysis:
doc_type = self.detect_document_type(content)
structure_score = self.calculate_structure_score(content, doc_type)
complexity_score = self.calculate_complexity_score(content)
content_types = self.identify_content_types(content)
language = self.detect_language(content)
estimated_tokens = self.estimate_tokens(content)
has_multimodal = self.detect_multimodal_content(content)
return DocumentAnalysis(
doc_type=doc_type,
structure_score=structure_score,
complexity_score=complexity_score,
content_types=content_types,
language=language,
estimated_tokens=estimated_tokens,
has_multimodal=has_multimodal
)
def detect_document_type(self, content: str) -> str:
content_lower = content.lower()
if '<html' in content_lower or '<body' in content_lower:
return 'html'
elif '#' in content and '##' in content:
return 'markdown'
elif '\\documentclass' in content_lower or '\\begin{' in content_lower:
return 'latex'
elif any(keyword in content_lower for keyword in ['abstract', 'introduction', 'conclusion', 'references']):
return 'academic'
elif 'def ' in content or 'class ' in content or 'function ' in content_lower:
return 'code'
else:
return 'plain'
def calculate_structure_score(self, content: str, doc_type: str) -> float:
patterns = self.structure_patterns.get(doc_type, [])
if not patterns:
return 0.5 # Default for unstructured content
line_count = len(content.split('\n'))
structured_lines = 0
for line in content.split('\n'):
for pattern in patterns:
if re.search(pattern, line.strip()):
structured_lines += 1
break
return min(structured_lines / max(line_count, 1), 1.0)
def calculate_complexity_score(self, content: str) -> float:
# Factors that increase complexity
avg_sentence_length = self.calculate_avg_sentence_length(content)
vocabulary_richness = self.calculate_vocabulary_richness(content)
nested_structure = self.detect_nested_structure(content)
# Normalize and combine
complexity = (
min(avg_sentence_length / 30, 1.0) * 0.3 +
vocabulary_richness * 0.4 +
nested_structure * 0.3
)
return min(complexity, 1.0)
def identify_content_types(self, content: str) -> List[str]:
types = []
if '```' in content or 'def ' in content or 'function ' in content.lower():
types.append('code')
if '|' in content and '\n' in content:
types.append('tables')
if re.search(r'\!\[.*\]\(.*\)', content):
types.append('images')
if re.search(r'http[s]?://', content):
types.append('links')
if re.search(r'\d+\.\d+', content) or re.search(r'\$\d', content):
types.append('numbers')
return types if types else ['text']
def detect_language(self, content: str) -> str:
# Simple language detection - can be enhanced with proper language detection libraries
if re.search(r'[\u4e00-\u9fff]', content):
return 'chinese'
elif re.search(r'[u0600-\u06ff]', content):
return 'arabic'
elif re.search(r'[u0400-\u04ff]', content):
return 'russian'
else:
return 'english' # Default assumption
def estimate_tokens(self, content: str) -> int:
# Rough estimation - actual tokenization varies by model
word_count = len(content.split())
return int(word_count * 1.3) # Average tokens per word
def detect_multimodal_content(self, content: str) -> bool:
multimodal_indicators = [
r'\!\[.*\]\(.*\)', # Images
r'<iframe', # Embedded content
r'<object', # Embedded objects
r'<embed', # Embedded media
]
return any(re.search(pattern, content) for pattern in multimodal_indicators)
def calculate_avg_sentence_length(self, content: str) -> float:
sentences = re.split(r'[.!?]+', content)
sentences = [s.strip() for s in sentences if s.strip()]
if not sentences:
return 0
return sum(len(s.split()) for s in sentences) / len(sentences)
def calculate_vocabulary_richness(self, content: str) -> float:
words = content.lower().split()
if not words:
return 0
unique_words = set(words)
return len(unique_words) / len(words)
def detect_nested_structure(self, content: str) -> float:
# Detect nested lists, indented content, etc.
lines = content.split('\n')
indented_lines = 0
for line in lines:
if line.strip() and line.startswith(' '):
indented_lines += 1
return indented_lines / max(len(lines), 1)
Strategy Selection Engine
from abc import ABC, abstractmethod
from typing import Dict, Any
class ChunkingStrategy(ABC):
@abstractmethod
def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
pass
class StrategySelector:
def __init__(self):
self.strategies = {
'fixed_size': FixedSizeStrategy(),
'recursive': RecursiveStrategy(),
'structure_aware': StructureAwareStrategy(),
'semantic': SemanticStrategy(),
'adaptive': AdaptiveStrategy()
}
def select_strategy(self, analysis: DocumentAnalysis) -> str:
# Rule-based selection logic
if analysis.structure_score > 0.8 and analysis.doc_type in ['markdown', 'html', 'latex']:
return 'structure_aware'
elif analysis.complexity_score > 0.7 and analysis.estimated_tokens < 10000:
return 'semantic'
elif analysis.doc_type == 'code':
return 'structure_aware'
elif analysis.structure_score < 0.3:
return 'fixed_size'
elif analysis.complexity_score > 0.5:
return 'recursive'
else:
return 'adaptive'
def get_strategy(self, analysis: DocumentAnalysis) -> ChunkingStrategy:
strategy_name = self.select_strategy(analysis)
return self.strategies[strategy_name]
# Example strategy implementations
class FixedSizeStrategy(ChunkingStrategy):
def __init__(self, default_size=512, default_overlap=50):
self.default_size = default_size
self.default_overlap = default_overlap
def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
# Adjust parameters based on analysis
if analysis.complexity_score > 0.7:
chunk_size = 1024
elif analysis.complexity_score < 0.3:
chunk_size = 256
else:
chunk_size = self.default_size
overlap = int(chunk_size * 0.1) # 10% overlap
# Implementation here...
return self._fixed_size_chunk(content, chunk_size, overlap)
def _fixed_size_chunk(self, content: str, chunk_size: int, overlap: int) -> List[Dict[str, Any]]:
# Implementation using RecursiveCharacterTextSplitter or custom logic
pass
class AdaptiveStrategy(ChunkingStrategy):
def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
# Combine multiple strategies based on content characteristics
if analysis.structure_score > 0.6:
# Use structure-aware for structured parts
structured_chunks = self._chunk_structured_parts(content, analysis)
else:
# Use fixed-size for unstructured parts
unstructured_chunks = self._chunk_unstructured_parts(content, analysis)
# Merge and optimize
return self._merge_chunks(structured_chunks + unstructured_chunks)
def _chunk_structured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
# Implementation for structured content
pass
def _chunk_unstructured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
# Implementation for unstructured content
pass
def _merge_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
# Implementation for merging and optimizing chunks
pass
Quality Assurance Framework
Chunk Quality Metrics
from typing import List, Dict, Any
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class ChunkQualityAssessor:
def __init__(self):
self.quality_weights = {
'coherence': 0.3,
'completeness': 0.25,
'size_appropriateness': 0.2,
'semantic_similarity': 0.15,
'boundary_quality': 0.1
}
def assess_chunks(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> Dict[str, float]:
scores = {}
# Coherence: Do chunks make sense on their own?
scores['coherence'] = self._assess_coherence(chunks)
# Completeness: Do chunks preserve important information?
scores['completeness'] = self._assess_completeness(chunks, analysis)
# Size appropriateness: Are chunks within optimal size range?
scores['size_appropriateness'] = self._assess_size(chunks)
# Semantic similarity: Are chunks thematically consistent?
scores['semantic_similarity'] = self._assess_semantic_consistency(chunks)
# Boundary quality: Are chunk boundaries placed well?
scores['boundary_quality'] = self._assess_boundary_quality(chunks)
# Calculate overall quality score
overall_score = sum(
score * self.quality_weights[metric]
for metric, score in scores.items()
)
scores['overall'] = overall_score
return scores
def _assess_coherence(self, chunks: List[Dict[str, Any]]) -> float:
# Simple heuristic-based coherence assessment
coherence_scores = []
for chunk in chunks:
content = chunk['content']
# Check for complete sentences
sentences = re.split(r'[.!?]+', content)
complete_sentences = sum(1 for s in sentences if s.strip())
coherence = complete_sentences / max(len(sentences), 1)
coherence_scores.append(coherence)
return np.mean(coherence_scores)
def _assess_completeness(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float:
# Check if important structural elements are preserved
if analysis.doc_type in ['markdown', 'html']:
return self._assess_structure_preservation(chunks, analysis)
else:
return self._assess_content_preservation(chunks)
def _assess_structure_preservation(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float:
# Check if headings, lists, and other structural elements are preserved
preserved_elements = 0
total_elements = 0
for chunk in chunks:
content = chunk['content']
# Count preserved structural elements
headings = len(re.findall(r'^#+\s', content, re.MULTILINE))
lists = len(re.findall(r'^\s*[-*+]\s', content, re.MULTILINE))
preserved_elements += headings + lists
total_elements += 1 # Simplified count
return preserved_elements / max(total_elements, 1)
def _assess_content_preservation(self, chunks: List[Dict[str, Any]]) -> float:
# Simple check based on content ratio
total_content = ''.join(chunk['content'] for chunk in chunks)
# This would need comparison with original content
return 0.8 # Placeholder
def _assess_size(self, chunks: List[Dict[str, Any]]) -> float:
optimal_min = 100 # tokens
optimal_max = 1000 # tokens
size_scores = []
for chunk in chunks:
token_count = self._estimate_tokens(chunk['content'])
if optimal_min <= token_count <= optimal_max:
score = 1.0
elif token_count < optimal_min:
score = token_count / optimal_min
else:
score = max(0, 1 - (token_count - optimal_max) / optimal_max)
size_scores.append(score)
return np.mean(size_scores)
def _assess_semantic_consistency(self, chunks: List[Dict[str, Any]]) -> float:
# This would require embedding models for actual implementation
# Placeholder implementation
return 0.7
def _assess_boundary_quality(self, chunks: List[Dict[str, Any]]) -> float:
# Check if boundaries don't split important content
boundary_scores = []
for i, chunk in enumerate(chunks):
content = chunk['content']
# Check for incomplete sentences at boundaries
if not content.strip().endswith(('.', '!', '?', '>', '}')):
boundary_scores.append(0.5)
else:
boundary_scores.append(1.0)
return np.mean(boundary_scores)
def _estimate_tokens(self, content: str) -> int:
# Simple token estimation
return len(content.split()) * 4 // 3 # Rough approximation
Error Handling and Edge Cases
Robust Error Handling
import logging
from typing import Optional, List
from dataclasses import dataclass
@dataclass
class ChunkingError:
error_type: str
message: str
chunk_index: Optional[int] = None
recovery_action: Optional[str] = None
class ChunkingErrorHandler:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.error_handlers = {
'empty_content': self._handle_empty_content,
'oversized_chunk': self._handle_oversized_chunk,
'encoding_error': self._handle_encoding_error,
'memory_error': self._handle_memory_error,
'structure_parsing_error': self._handle_structure_parsing_error
}
def handle_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
error_type = self._classify_error(error)
handler = self.error_handlers.get(error_type, self._handle_generic_error)
return handler(error, context)
def _classify_error(self, error: Exception) -> str:
if isinstance(error, ValueError) and 'empty' in str(error).lower():
return 'empty_content'
elif isinstance(error, MemoryError):
return 'memory_error'
elif isinstance(error, UnicodeError):
return 'encoding_error'
elif 'too large' in str(error).lower():
return 'oversized_chunk'
elif 'parsing' in str(error).lower():
return 'structure_parsing_error'
else:
return 'generic_error'
def _handle_empty_content(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.warning(f"Empty content encountered: {error}")
return ChunkingError(
error_type='empty_content',
message=str(error),
recovery_action='skip_empty_content'
)
def _handle_oversized_chunk(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.warning(f"Oversized chunk detected: {error}")
return ChunkingError(
error_type='oversized_chunk',
message=str(error),
chunk_index=context.get('chunk_index'),
recovery_action='reduce_chunk_size'
)
def _handle_encoding_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.error(f"Encoding error: {error}")
return ChunkingError(
error_type='encoding_error',
message=str(error),
recovery_action='fallback_encoding'
)
def _handle_memory_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.error(f"Memory error during chunking: {error}")
return ChunkingError(
error_type='memory_error',
message=str(error),
recovery_action='process_in_batches'
)
def _handle_structure_parsing_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.warning(f"Structure parsing failed: {error}")
return ChunkingError(
error_type='structure_parsing_error',
message=str(error),
recovery_action='fallback_to_fixed_size'
)
def _handle_generic_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
self.logger.error(f"Unexpected error during chunking: {error}")
return ChunkingError(
error_type='generic_error',
message=str(error),
recovery_action='skip_and_continue'
)
Performance Optimization
Caching and Memoization
import hashlib
import pickle
from functools import lru_cache
from typing import Dict, Any, Optional
import redis
import json
class ChunkingCache:
def __init__(self, redis_url: Optional[str] = None):
if redis_url:
self.redis_client = redis.from_url(redis_url)
else:
self.redis_client = None
self.local_cache = {}
def _generate_cache_key(self, content: str, strategy: str, params: Dict[str, Any]) -> str:
content_hash = hashlib.md5(content.encode()).hexdigest()
params_str = json.dumps(params, sort_keys=True)
params_hash = hashlib.md5(params_str.encode()).hexdigest()
return f"chunking:{strategy}:{content_hash}:{params_hash}"
def get(self, content: str, strategy: str, params: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]:
cache_key = self._generate_cache_key(content, strategy, params)
# Try local cache first
if cache_key in self.local_cache:
return self.local_cache[cache_key]
# Try Redis cache
if self.redis_client:
try:
cached_data = self.redis_client.get(cache_key)
if cached_data:
chunks = pickle.loads(cached_data)
self.local_cache[cache_key] = chunks # Cache locally too
return chunks
except Exception as e:
logging.warning(f"Redis cache error: {e}")
return None
def set(self, content: str, strategy: str, params: Dict[str, Any], chunks: List[Dict[str, Any]]) -> None:
cache_key = self._generate_cache_key(content, strategy, params)
# Store in local cache
self.local_cache[cache_key] = chunks
# Store in Redis cache
if self.redis_client:
try:
cached_data = pickle.dumps(chunks)
self.redis_client.setex(cache_key, 3600, cached_data) # 1 hour TTL
except Exception as e:
logging.warning(f"Redis cache set error: {e}")
def clear_local_cache(self):
self.local_cache.clear()
def clear_redis_cache(self):
if self.redis_client:
pattern = "chunking:*"
keys = self.redis_client.keys(pattern)
if keys:
self.redis_client.delete(*keys)
Batch Processing
import asyncio
import concurrent.futures
from typing import List, Callable, Any
class BatchChunkingProcessor:
def __init__(self, max_workers: int = 4, batch_size: int = 10):
self.max_workers = max_workers
self.batch_size = batch_size
def process_documents_batch(self, documents: List[str],
chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]:
"""Process multiple documents in parallel"""
results = []
# Process in batches to avoid memory issues
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_doc = {
executor.submit(chunking_function, doc): doc
for doc in batch
}
batch_results = []
for future in concurrent.futures.as_completed(future_to_doc):
try:
chunks = future.result()
batch_results.append(chunks)
except Exception as e:
logging.error(f"Error processing document: {e}")
batch_results.append([]) # Empty result for failed processing
results.extend(batch_results)
return results
async def process_documents_async(self, documents: List[str],
chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]:
"""Process documents asynchronously"""
semaphore = asyncio.Semaphore(self.max_workers)
async def process_single_document(doc: str) -> List[Dict[str, Any]]:
async with semaphore:
# Run the synchronous chunking function in an executor
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, chunking_function, doc)
tasks = [process_single_document(doc) for doc in documents]
return await asyncio.gather(*tasks, return_exceptions=True)
Monitoring and Observability
Metrics Collection
import time
from dataclasses import dataclass
from typing import Dict, Any, List
from collections import defaultdict
@dataclass
class ChunkingMetrics:
total_documents: int
total_chunks: int
avg_chunk_size: float
processing_time: float
memory_usage: float
error_count: int
strategy_distribution: Dict[str, int]
class MetricsCollector:
def __init__(self):
self.metrics = defaultdict(list)
self.start_time = None
def start_timing(self):
self.start_time = time.time()
def end_timing(self) -> float:
if self.start_time:
duration = time.time() - self.start_time
self.metrics['processing_time'].append(duration)
self.start_time = None
return duration
return 0.0
def record_chunk_count(self, count: int):
self.metrics['chunk_count'].append(count)
def record_chunk_size(self, size: int):
self.metrics['chunk_size'].append(size)
def record_strategy_usage(self, strategy: str):
self.metrics['strategy'][strategy] = self.metrics['strategy'].get(strategy, 0) + 1
def record_error(self, error_type: str):
self.metrics['errors'].append(error_type)
def record_memory_usage(self, memory_mb: float):
self.metrics['memory_usage'].append(memory_mb)
def get_summary(self) -> ChunkingMetrics:
return ChunkingMetrics(
total_documents=len(self.metrics['processing_time']),
total_chunks=sum(self.metrics['chunk_count']),
avg_chunk_size=sum(self.metrics['chunk_size']) / max(len(self.metrics['chunk_size']), 1),
processing_time=sum(self.metrics['processing_time']),
memory_usage=sum(self.metrics['memory_usage']) / max(len(self.metrics['memory_usage']), 1),
error_count=len(self.metrics['errors']),
strategy_distribution=dict(self.metrics['strategy'])
)
def reset(self):
self.metrics.clear()
self.start_time = None
This implementation guide provides a comprehensive foundation for building robust, scalable chunking systems that can handle various document types and use cases while maintaining high quality and performance.