# Complete Implementation Guidelines This document provides comprehensive implementation guidance for building effective chunking systems. ## System Architecture ### Core Components ``` Document Processor ├── Ingestion Layer │ ├── Document Type Detection │ ├── Format Parsing (PDF, HTML, Markdown, etc.) │ └── Content Extraction ├── Analysis Layer │ ├── Structure Analysis │ ├── Content Type Identification │ └── Complexity Assessment ├── Strategy Selection Layer │ ├── Rule-based Selection │ ├── ML-based Prediction │ └── Adaptive Configuration ├── Chunking Layer │ ├── Strategy Implementation │ ├── Parameter Optimization │ └── Quality Validation └── Output Layer ├── Chunk Metadata Generation ├── Embedding Integration └── Storage Preparation ``` ## Pre-processing Pipeline ### Document Analysis Framework ```python from dataclasses import dataclass from typing import List, Dict, Any import re @dataclass class DocumentAnalysis: doc_type: str structure_score: float # 0-1, higher means more structured complexity_score: float # 0-1, higher means more complex content_types: List[str] language: str estimated_tokens: int has_multimodal: bool class DocumentAnalyzer: def __init__(self): self.structure_patterns = { 'markdown': [r'^#+\s', r'^\*\*.*\*\*$', r'^\* ', r'^\d+\. '], 'html': [r'', r'

', r'

', r''], 'latex': [r'\\section', r'\\subsection', r'\\begin\{', r'\\end\{'], 'academic': [r'^\d+\.', r'^\d+\.\d+', r'^[A-Z]\.', r'^Figure \d+'] } def analyze(self, content: str) -> DocumentAnalysis: doc_type = self.detect_document_type(content) structure_score = self.calculate_structure_score(content, doc_type) complexity_score = self.calculate_complexity_score(content) content_types = self.identify_content_types(content) language = self.detect_language(content) estimated_tokens = self.estimate_tokens(content) has_multimodal = self.detect_multimodal_content(content) return DocumentAnalysis( doc_type=doc_type, structure_score=structure_score, complexity_score=complexity_score, content_types=content_types, language=language, estimated_tokens=estimated_tokens, has_multimodal=has_multimodal ) def detect_document_type(self, content: str) -> str: content_lower = content.lower() if ' float: patterns = self.structure_patterns.get(doc_type, []) if not patterns: return 0.5 # Default for unstructured content line_count = len(content.split('\n')) structured_lines = 0 for line in content.split('\n'): for pattern in patterns: if re.search(pattern, line.strip()): structured_lines += 1 break return min(structured_lines / max(line_count, 1), 1.0) def calculate_complexity_score(self, content: str) -> float: # Factors that increase complexity avg_sentence_length = self.calculate_avg_sentence_length(content) vocabulary_richness = self.calculate_vocabulary_richness(content) nested_structure = self.detect_nested_structure(content) # Normalize and combine complexity = ( min(avg_sentence_length / 30, 1.0) * 0.3 + vocabulary_richness * 0.4 + nested_structure * 0.3 ) return min(complexity, 1.0) def identify_content_types(self, content: str) -> List[str]: types = [] if '```' in content or 'def ' in content or 'function ' in content.lower(): types.append('code') if '|' in content and '\n' in content: types.append('tables') if re.search(r'\!\[.*\]\(.*\)', content): types.append('images') if re.search(r'http[s]?://', content): types.append('links') if re.search(r'\d+\.\d+', content) or re.search(r'\$\d', content): types.append('numbers') return types if types else ['text'] def detect_language(self, content: str) -> str: # Simple language detection - can be enhanced with proper language detection libraries if re.search(r'[\u4e00-\u9fff]', content): return 'chinese' elif re.search(r'[u0600-\u06ff]', content): return 'arabic' elif re.search(r'[u0400-\u04ff]', content): return 'russian' else: return 'english' # Default assumption def estimate_tokens(self, content: str) -> int: # Rough estimation - actual tokenization varies by model word_count = len(content.split()) return int(word_count * 1.3) # Average tokens per word def detect_multimodal_content(self, content: str) -> bool: multimodal_indicators = [ r'\!\[.*\]\(.*\)', # Images r' float: sentences = re.split(r'[.!?]+', content) sentences = [s.strip() for s in sentences if s.strip()] if not sentences: return 0 return sum(len(s.split()) for s in sentences) / len(sentences) def calculate_vocabulary_richness(self, content: str) -> float: words = content.lower().split() if not words: return 0 unique_words = set(words) return len(unique_words) / len(words) def detect_nested_structure(self, content: str) -> float: # Detect nested lists, indented content, etc. lines = content.split('\n') indented_lines = 0 for line in lines: if line.strip() and line.startswith(' '): indented_lines += 1 return indented_lines / max(len(lines), 1) ``` ### Strategy Selection Engine ```python from abc import ABC, abstractmethod from typing import Dict, Any class ChunkingStrategy(ABC): @abstractmethod def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]: pass class StrategySelector: def __init__(self): self.strategies = { 'fixed_size': FixedSizeStrategy(), 'recursive': RecursiveStrategy(), 'structure_aware': StructureAwareStrategy(), 'semantic': SemanticStrategy(), 'adaptive': AdaptiveStrategy() } def select_strategy(self, analysis: DocumentAnalysis) -> str: # Rule-based selection logic if analysis.structure_score > 0.8 and analysis.doc_type in ['markdown', 'html', 'latex']: return 'structure_aware' elif analysis.complexity_score > 0.7 and analysis.estimated_tokens < 10000: return 'semantic' elif analysis.doc_type == 'code': return 'structure_aware' elif analysis.structure_score < 0.3: return 'fixed_size' elif analysis.complexity_score > 0.5: return 'recursive' else: return 'adaptive' def get_strategy(self, analysis: DocumentAnalysis) -> ChunkingStrategy: strategy_name = self.select_strategy(analysis) return self.strategies[strategy_name] # Example strategy implementations class FixedSizeStrategy(ChunkingStrategy): def __init__(self, default_size=512, default_overlap=50): self.default_size = default_size self.default_overlap = default_overlap def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]: # Adjust parameters based on analysis if analysis.complexity_score > 0.7: chunk_size = 1024 elif analysis.complexity_score < 0.3: chunk_size = 256 else: chunk_size = self.default_size overlap = int(chunk_size * 0.1) # 10% overlap # Implementation here... return self._fixed_size_chunk(content, chunk_size, overlap) def _fixed_size_chunk(self, content: str, chunk_size: int, overlap: int) -> List[Dict[str, Any]]: # Implementation using RecursiveCharacterTextSplitter or custom logic pass class AdaptiveStrategy(ChunkingStrategy): def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]: # Combine multiple strategies based on content characteristics if analysis.structure_score > 0.6: # Use structure-aware for structured parts structured_chunks = self._chunk_structured_parts(content, analysis) else: # Use fixed-size for unstructured parts unstructured_chunks = self._chunk_unstructured_parts(content, analysis) # Merge and optimize return self._merge_chunks(structured_chunks + unstructured_chunks) def _chunk_structured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]: # Implementation for structured content pass def _chunk_unstructured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]: # Implementation for unstructured content pass def _merge_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]: # Implementation for merging and optimizing chunks pass ``` ## Quality Assurance Framework ### Chunk Quality Metrics ```python from typing import List, Dict, Any import numpy as np from sklearn.metrics.pairwise import cosine_similarity class ChunkQualityAssessor: def __init__(self): self.quality_weights = { 'coherence': 0.3, 'completeness': 0.25, 'size_appropriateness': 0.2, 'semantic_similarity': 0.15, 'boundary_quality': 0.1 } def assess_chunks(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> Dict[str, float]: scores = {} # Coherence: Do chunks make sense on their own? scores['coherence'] = self._assess_coherence(chunks) # Completeness: Do chunks preserve important information? scores['completeness'] = self._assess_completeness(chunks, analysis) # Size appropriateness: Are chunks within optimal size range? scores['size_appropriateness'] = self._assess_size(chunks) # Semantic similarity: Are chunks thematically consistent? scores['semantic_similarity'] = self._assess_semantic_consistency(chunks) # Boundary quality: Are chunk boundaries placed well? scores['boundary_quality'] = self._assess_boundary_quality(chunks) # Calculate overall quality score overall_score = sum( score * self.quality_weights[metric] for metric, score in scores.items() ) scores['overall'] = overall_score return scores def _assess_coherence(self, chunks: List[Dict[str, Any]]) -> float: # Simple heuristic-based coherence assessment coherence_scores = [] for chunk in chunks: content = chunk['content'] # Check for complete sentences sentences = re.split(r'[.!?]+', content) complete_sentences = sum(1 for s in sentences if s.strip()) coherence = complete_sentences / max(len(sentences), 1) coherence_scores.append(coherence) return np.mean(coherence_scores) def _assess_completeness(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float: # Check if important structural elements are preserved if analysis.doc_type in ['markdown', 'html']: return self._assess_structure_preservation(chunks, analysis) else: return self._assess_content_preservation(chunks) def _assess_structure_preservation(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float: # Check if headings, lists, and other structural elements are preserved preserved_elements = 0 total_elements = 0 for chunk in chunks: content = chunk['content'] # Count preserved structural elements headings = len(re.findall(r'^#+\s', content, re.MULTILINE)) lists = len(re.findall(r'^\s*[-*+]\s', content, re.MULTILINE)) preserved_elements += headings + lists total_elements += 1 # Simplified count return preserved_elements / max(total_elements, 1) def _assess_content_preservation(self, chunks: List[Dict[str, Any]]) -> float: # Simple check based on content ratio total_content = ''.join(chunk['content'] for chunk in chunks) # This would need comparison with original content return 0.8 # Placeholder def _assess_size(self, chunks: List[Dict[str, Any]]) -> float: optimal_min = 100 # tokens optimal_max = 1000 # tokens size_scores = [] for chunk in chunks: token_count = self._estimate_tokens(chunk['content']) if optimal_min <= token_count <= optimal_max: score = 1.0 elif token_count < optimal_min: score = token_count / optimal_min else: score = max(0, 1 - (token_count - optimal_max) / optimal_max) size_scores.append(score) return np.mean(size_scores) def _assess_semantic_consistency(self, chunks: List[Dict[str, Any]]) -> float: # This would require embedding models for actual implementation # Placeholder implementation return 0.7 def _assess_boundary_quality(self, chunks: List[Dict[str, Any]]) -> float: # Check if boundaries don't split important content boundary_scores = [] for i, chunk in enumerate(chunks): content = chunk['content'] # Check for incomplete sentences at boundaries if not content.strip().endswith(('.', '!', '?', '>', '}')): boundary_scores.append(0.5) else: boundary_scores.append(1.0) return np.mean(boundary_scores) def _estimate_tokens(self, content: str) -> int: # Simple token estimation return len(content.split()) * 4 // 3 # Rough approximation ``` ## Error Handling and Edge Cases ### Robust Error Handling ```python import logging from typing import Optional, List from dataclasses import dataclass @dataclass class ChunkingError: error_type: str message: str chunk_index: Optional[int] = None recovery_action: Optional[str] = None class ChunkingErrorHandler: def __init__(self): self.logger = logging.getLogger(__name__) self.error_handlers = { 'empty_content': self._handle_empty_content, 'oversized_chunk': self._handle_oversized_chunk, 'encoding_error': self._handle_encoding_error, 'memory_error': self._handle_memory_error, 'structure_parsing_error': self._handle_structure_parsing_error } def handle_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError: error_type = self._classify_error(error) handler = self.error_handlers.get(error_type, self._handle_generic_error) return handler(error, context) def _classify_error(self, error: Exception) -> str: if isinstance(error, ValueError) and 'empty' in str(error).lower(): return 'empty_content' elif isinstance(error, MemoryError): return 'memory_error' elif isinstance(error, UnicodeError): return 'encoding_error' elif 'too large' in str(error).lower(): return 'oversized_chunk' elif 'parsing' in str(error).lower(): return 'structure_parsing_error' else: return 'generic_error' def _handle_empty_content(self, error: Exception, context: Dict[str, Any]) -> ChunkingError: self.logger.warning(f"Empty content encountered: {error}") return ChunkingError( error_type='empty_content', message=str(error), recovery_action='skip_empty_content' ) def _handle_oversized_chunk(self, error: Exception, context: Dict[str, Any]) -> ChunkingError: self.logger.warning(f"Oversized chunk detected: {error}") return ChunkingError( error_type='oversized_chunk', message=str(error), chunk_index=context.get('chunk_index'), recovery_action='reduce_chunk_size' ) def _handle_encoding_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError: self.logger.error(f"Encoding error: {error}") return ChunkingError( error_type='encoding_error', message=str(error), recovery_action='fallback_encoding' ) def _handle_memory_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError: self.logger.error(f"Memory error during chunking: {error}") return ChunkingError( error_type='memory_error', message=str(error), recovery_action='process_in_batches' ) def _handle_structure_parsing_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError: self.logger.warning(f"Structure parsing failed: {error}") return ChunkingError( error_type='structure_parsing_error', message=str(error), recovery_action='fallback_to_fixed_size' ) def _handle_generic_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError: self.logger.error(f"Unexpected error during chunking: {error}") return ChunkingError( error_type='generic_error', message=str(error), recovery_action='skip_and_continue' ) ``` ## Performance Optimization ### Caching and Memoization ```python import hashlib import pickle from functools import lru_cache from typing import Dict, Any, Optional import redis import json class ChunkingCache: def __init__(self, redis_url: Optional[str] = None): if redis_url: self.redis_client = redis.from_url(redis_url) else: self.redis_client = None self.local_cache = {} def _generate_cache_key(self, content: str, strategy: str, params: Dict[str, Any]) -> str: content_hash = hashlib.md5(content.encode()).hexdigest() params_str = json.dumps(params, sort_keys=True) params_hash = hashlib.md5(params_str.encode()).hexdigest() return f"chunking:{strategy}:{content_hash}:{params_hash}" def get(self, content: str, strategy: str, params: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]: cache_key = self._generate_cache_key(content, strategy, params) # Try local cache first if cache_key in self.local_cache: return self.local_cache[cache_key] # Try Redis cache if self.redis_client: try: cached_data = self.redis_client.get(cache_key) if cached_data: chunks = pickle.loads(cached_data) self.local_cache[cache_key] = chunks # Cache locally too return chunks except Exception as e: logging.warning(f"Redis cache error: {e}") return None def set(self, content: str, strategy: str, params: Dict[str, Any], chunks: List[Dict[str, Any]]) -> None: cache_key = self._generate_cache_key(content, strategy, params) # Store in local cache self.local_cache[cache_key] = chunks # Store in Redis cache if self.redis_client: try: cached_data = pickle.dumps(chunks) self.redis_client.setex(cache_key, 3600, cached_data) # 1 hour TTL except Exception as e: logging.warning(f"Redis cache set error: {e}") def clear_local_cache(self): self.local_cache.clear() def clear_redis_cache(self): if self.redis_client: pattern = "chunking:*" keys = self.redis_client.keys(pattern) if keys: self.redis_client.delete(*keys) ``` ### Batch Processing ```python import asyncio import concurrent.futures from typing import List, Callable, Any class BatchChunkingProcessor: def __init__(self, max_workers: int = 4, batch_size: int = 10): self.max_workers = max_workers self.batch_size = batch_size def process_documents_batch(self, documents: List[str], chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]: """Process multiple documents in parallel""" results = [] # Process in batches to avoid memory issues for i in range(0, len(documents), self.batch_size): batch = documents[i:i + self.batch_size] with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: future_to_doc = { executor.submit(chunking_function, doc): doc for doc in batch } batch_results = [] for future in concurrent.futures.as_completed(future_to_doc): try: chunks = future.result() batch_results.append(chunks) except Exception as e: logging.error(f"Error processing document: {e}") batch_results.append([]) # Empty result for failed processing results.extend(batch_results) return results async def process_documents_async(self, documents: List[str], chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]: """Process documents asynchronously""" semaphore = asyncio.Semaphore(self.max_workers) async def process_single_document(doc: str) -> List[Dict[str, Any]]: async with semaphore: # Run the synchronous chunking function in an executor loop = asyncio.get_event_loop() return await loop.run_in_executor(None, chunking_function, doc) tasks = [process_single_document(doc) for doc in documents] return await asyncio.gather(*tasks, return_exceptions=True) ``` ## Monitoring and Observability ### Metrics Collection ```python import time from dataclasses import dataclass from typing import Dict, Any, List from collections import defaultdict @dataclass class ChunkingMetrics: total_documents: int total_chunks: int avg_chunk_size: float processing_time: float memory_usage: float error_count: int strategy_distribution: Dict[str, int] class MetricsCollector: def __init__(self): self.metrics = defaultdict(list) self.start_time = None def start_timing(self): self.start_time = time.time() def end_timing(self) -> float: if self.start_time: duration = time.time() - self.start_time self.metrics['processing_time'].append(duration) self.start_time = None return duration return 0.0 def record_chunk_count(self, count: int): self.metrics['chunk_count'].append(count) def record_chunk_size(self, size: int): self.metrics['chunk_size'].append(size) def record_strategy_usage(self, strategy: str): self.metrics['strategy'][strategy] = self.metrics['strategy'].get(strategy, 0) + 1 def record_error(self, error_type: str): self.metrics['errors'].append(error_type) def record_memory_usage(self, memory_mb: float): self.metrics['memory_usage'].append(memory_mb) def get_summary(self) -> ChunkingMetrics: return ChunkingMetrics( total_documents=len(self.metrics['processing_time']), total_chunks=sum(self.metrics['chunk_count']), avg_chunk_size=sum(self.metrics['chunk_size']) / max(len(self.metrics['chunk_size']), 1), processing_time=sum(self.metrics['processing_time']), memory_usage=sum(self.metrics['memory_usage']) / max(len(self.metrics['memory_usage']), 1), error_count=len(self.metrics['errors']), strategy_distribution=dict(self.metrics['strategy']) ) def reset(self): self.metrics.clear() self.start_time = None ``` This implementation guide provides a comprehensive foundation for building robust, scalable chunking systems that can handle various document types and use cases while maintaining high quality and performance.