709 lines
25 KiB
Markdown
709 lines
25 KiB
Markdown
# Complete Implementation Guidelines
|
|
|
|
This document provides comprehensive implementation guidance for building effective chunking systems.
|
|
|
|
## System Architecture
|
|
|
|
### Core Components
|
|
|
|
```
|
|
Document Processor
|
|
├── Ingestion Layer
|
|
│ ├── Document Type Detection
|
|
│ ├── Format Parsing (PDF, HTML, Markdown, etc.)
|
|
│ └── Content Extraction
|
|
├── Analysis Layer
|
|
│ ├── Structure Analysis
|
|
│ ├── Content Type Identification
|
|
│ └── Complexity Assessment
|
|
├── Strategy Selection Layer
|
|
│ ├── Rule-based Selection
|
|
│ ├── ML-based Prediction
|
|
│ └── Adaptive Configuration
|
|
├── Chunking Layer
|
|
│ ├── Strategy Implementation
|
|
│ ├── Parameter Optimization
|
|
│ └── Quality Validation
|
|
└── Output Layer
|
|
├── Chunk Metadata Generation
|
|
├── Embedding Integration
|
|
└── Storage Preparation
|
|
```
|
|
|
|
## Pre-processing Pipeline
|
|
|
|
### Document Analysis Framework
|
|
|
|
```python
|
|
from dataclasses import dataclass
|
|
from typing import List, Dict, Any
|
|
import re
|
|
|
|
@dataclass
|
|
class DocumentAnalysis:
|
|
doc_type: str
|
|
structure_score: float # 0-1, higher means more structured
|
|
complexity_score: float # 0-1, higher means more complex
|
|
content_types: List[str]
|
|
language: str
|
|
estimated_tokens: int
|
|
has_multimodal: bool
|
|
|
|
class DocumentAnalyzer:
|
|
def __init__(self):
|
|
self.structure_patterns = {
|
|
'markdown': [r'^#+\s', r'^\*\*.*\*\*$', r'^\* ', r'^\d+\. '],
|
|
'html': [r'<h[1-6]>', r'<p>', r'<div>', r'<table>'],
|
|
'latex': [r'\\section', r'\\subsection', r'\\begin\{', r'\\end\{'],
|
|
'academic': [r'^\d+\.', r'^\d+\.\d+', r'^[A-Z]\.', r'^Figure \d+']
|
|
}
|
|
|
|
def analyze(self, content: str) -> DocumentAnalysis:
|
|
doc_type = self.detect_document_type(content)
|
|
structure_score = self.calculate_structure_score(content, doc_type)
|
|
complexity_score = self.calculate_complexity_score(content)
|
|
content_types = self.identify_content_types(content)
|
|
language = self.detect_language(content)
|
|
estimated_tokens = self.estimate_tokens(content)
|
|
has_multimodal = self.detect_multimodal_content(content)
|
|
|
|
return DocumentAnalysis(
|
|
doc_type=doc_type,
|
|
structure_score=structure_score,
|
|
complexity_score=complexity_score,
|
|
content_types=content_types,
|
|
language=language,
|
|
estimated_tokens=estimated_tokens,
|
|
has_multimodal=has_multimodal
|
|
)
|
|
|
|
def detect_document_type(self, content: str) -> str:
|
|
content_lower = content.lower()
|
|
|
|
if '<html' in content_lower or '<body' in content_lower:
|
|
return 'html'
|
|
elif '#' in content and '##' in content:
|
|
return 'markdown'
|
|
elif '\\documentclass' in content_lower or '\\begin{' in content_lower:
|
|
return 'latex'
|
|
elif any(keyword in content_lower for keyword in ['abstract', 'introduction', 'conclusion', 'references']):
|
|
return 'academic'
|
|
elif 'def ' in content or 'class ' in content or 'function ' in content_lower:
|
|
return 'code'
|
|
else:
|
|
return 'plain'
|
|
|
|
def calculate_structure_score(self, content: str, doc_type: str) -> float:
|
|
patterns = self.structure_patterns.get(doc_type, [])
|
|
if not patterns:
|
|
return 0.5 # Default for unstructured content
|
|
|
|
line_count = len(content.split('\n'))
|
|
structured_lines = 0
|
|
|
|
for line in content.split('\n'):
|
|
for pattern in patterns:
|
|
if re.search(pattern, line.strip()):
|
|
structured_lines += 1
|
|
break
|
|
|
|
return min(structured_lines / max(line_count, 1), 1.0)
|
|
|
|
def calculate_complexity_score(self, content: str) -> float:
|
|
# Factors that increase complexity
|
|
avg_sentence_length = self.calculate_avg_sentence_length(content)
|
|
vocabulary_richness = self.calculate_vocabulary_richness(content)
|
|
nested_structure = self.detect_nested_structure(content)
|
|
|
|
# Normalize and combine
|
|
complexity = (
|
|
min(avg_sentence_length / 30, 1.0) * 0.3 +
|
|
vocabulary_richness * 0.4 +
|
|
nested_structure * 0.3
|
|
)
|
|
|
|
return min(complexity, 1.0)
|
|
|
|
def identify_content_types(self, content: str) -> List[str]:
|
|
types = []
|
|
|
|
if '```' in content or 'def ' in content or 'function ' in content.lower():
|
|
types.append('code')
|
|
if '|' in content and '\n' in content:
|
|
types.append('tables')
|
|
if re.search(r'\!\[.*\]\(.*\)', content):
|
|
types.append('images')
|
|
if re.search(r'http[s]?://', content):
|
|
types.append('links')
|
|
if re.search(r'\d+\.\d+', content) or re.search(r'\$\d', content):
|
|
types.append('numbers')
|
|
|
|
return types if types else ['text']
|
|
|
|
def detect_language(self, content: str) -> str:
|
|
# Simple language detection - can be enhanced with proper language detection libraries
|
|
if re.search(r'[\u4e00-\u9fff]', content):
|
|
return 'chinese'
|
|
elif re.search(r'[u0600-\u06ff]', content):
|
|
return 'arabic'
|
|
elif re.search(r'[u0400-\u04ff]', content):
|
|
return 'russian'
|
|
else:
|
|
return 'english' # Default assumption
|
|
|
|
def estimate_tokens(self, content: str) -> int:
|
|
# Rough estimation - actual tokenization varies by model
|
|
word_count = len(content.split())
|
|
return int(word_count * 1.3) # Average tokens per word
|
|
|
|
def detect_multimodal_content(self, content: str) -> bool:
|
|
multimodal_indicators = [
|
|
r'\!\[.*\]\(.*\)', # Images
|
|
r'<iframe', # Embedded content
|
|
r'<object', # Embedded objects
|
|
r'<embed', # Embedded media
|
|
]
|
|
|
|
return any(re.search(pattern, content) for pattern in multimodal_indicators)
|
|
|
|
def calculate_avg_sentence_length(self, content: str) -> float:
|
|
sentences = re.split(r'[.!?]+', content)
|
|
sentences = [s.strip() for s in sentences if s.strip()]
|
|
if not sentences:
|
|
return 0
|
|
return sum(len(s.split()) for s in sentences) / len(sentences)
|
|
|
|
def calculate_vocabulary_richness(self, content: str) -> float:
|
|
words = content.lower().split()
|
|
if not words:
|
|
return 0
|
|
unique_words = set(words)
|
|
return len(unique_words) / len(words)
|
|
|
|
def detect_nested_structure(self, content: str) -> float:
|
|
# Detect nested lists, indented content, etc.
|
|
lines = content.split('\n')
|
|
indented_lines = 0
|
|
|
|
for line in lines:
|
|
if line.strip() and line.startswith(' '):
|
|
indented_lines += 1
|
|
|
|
return indented_lines / max(len(lines), 1)
|
|
```
|
|
|
|
### Strategy Selection Engine
|
|
|
|
```python
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, Any
|
|
|
|
class ChunkingStrategy(ABC):
|
|
@abstractmethod
|
|
def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
|
|
pass
|
|
|
|
class StrategySelector:
|
|
def __init__(self):
|
|
self.strategies = {
|
|
'fixed_size': FixedSizeStrategy(),
|
|
'recursive': RecursiveStrategy(),
|
|
'structure_aware': StructureAwareStrategy(),
|
|
'semantic': SemanticStrategy(),
|
|
'adaptive': AdaptiveStrategy()
|
|
}
|
|
|
|
def select_strategy(self, analysis: DocumentAnalysis) -> str:
|
|
# Rule-based selection logic
|
|
if analysis.structure_score > 0.8 and analysis.doc_type in ['markdown', 'html', 'latex']:
|
|
return 'structure_aware'
|
|
elif analysis.complexity_score > 0.7 and analysis.estimated_tokens < 10000:
|
|
return 'semantic'
|
|
elif analysis.doc_type == 'code':
|
|
return 'structure_aware'
|
|
elif analysis.structure_score < 0.3:
|
|
return 'fixed_size'
|
|
elif analysis.complexity_score > 0.5:
|
|
return 'recursive'
|
|
else:
|
|
return 'adaptive'
|
|
|
|
def get_strategy(self, analysis: DocumentAnalysis) -> ChunkingStrategy:
|
|
strategy_name = self.select_strategy(analysis)
|
|
return self.strategies[strategy_name]
|
|
|
|
# Example strategy implementations
|
|
class FixedSizeStrategy(ChunkingStrategy):
|
|
def __init__(self, default_size=512, default_overlap=50):
|
|
self.default_size = default_size
|
|
self.default_overlap = default_overlap
|
|
|
|
def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
|
|
# Adjust parameters based on analysis
|
|
if analysis.complexity_score > 0.7:
|
|
chunk_size = 1024
|
|
elif analysis.complexity_score < 0.3:
|
|
chunk_size = 256
|
|
else:
|
|
chunk_size = self.default_size
|
|
|
|
overlap = int(chunk_size * 0.1) # 10% overlap
|
|
|
|
# Implementation here...
|
|
return self._fixed_size_chunk(content, chunk_size, overlap)
|
|
|
|
def _fixed_size_chunk(self, content: str, chunk_size: int, overlap: int) -> List[Dict[str, Any]]:
|
|
# Implementation using RecursiveCharacterTextSplitter or custom logic
|
|
pass
|
|
|
|
class AdaptiveStrategy(ChunkingStrategy):
|
|
def chunk(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
|
|
# Combine multiple strategies based on content characteristics
|
|
if analysis.structure_score > 0.6:
|
|
# Use structure-aware for structured parts
|
|
structured_chunks = self._chunk_structured_parts(content, analysis)
|
|
else:
|
|
# Use fixed-size for unstructured parts
|
|
unstructured_chunks = self._chunk_unstructured_parts(content, analysis)
|
|
|
|
# Merge and optimize
|
|
return self._merge_chunks(structured_chunks + unstructured_chunks)
|
|
|
|
def _chunk_structured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
|
|
# Implementation for structured content
|
|
pass
|
|
|
|
def _chunk_unstructured_parts(self, content: str, analysis: DocumentAnalysis) -> List[Dict[str, Any]]:
|
|
# Implementation for unstructured content
|
|
pass
|
|
|
|
def _merge_chunks(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
# Implementation for merging and optimizing chunks
|
|
pass
|
|
```
|
|
|
|
## Quality Assurance Framework
|
|
|
|
### Chunk Quality Metrics
|
|
|
|
```python
|
|
from typing import List, Dict, Any
|
|
import numpy as np
|
|
from sklearn.metrics.pairwise import cosine_similarity
|
|
|
|
class ChunkQualityAssessor:
|
|
def __init__(self):
|
|
self.quality_weights = {
|
|
'coherence': 0.3,
|
|
'completeness': 0.25,
|
|
'size_appropriateness': 0.2,
|
|
'semantic_similarity': 0.15,
|
|
'boundary_quality': 0.1
|
|
}
|
|
|
|
def assess_chunks(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> Dict[str, float]:
|
|
scores = {}
|
|
|
|
# Coherence: Do chunks make sense on their own?
|
|
scores['coherence'] = self._assess_coherence(chunks)
|
|
|
|
# Completeness: Do chunks preserve important information?
|
|
scores['completeness'] = self._assess_completeness(chunks, analysis)
|
|
|
|
# Size appropriateness: Are chunks within optimal size range?
|
|
scores['size_appropriateness'] = self._assess_size(chunks)
|
|
|
|
# Semantic similarity: Are chunks thematically consistent?
|
|
scores['semantic_similarity'] = self._assess_semantic_consistency(chunks)
|
|
|
|
# Boundary quality: Are chunk boundaries placed well?
|
|
scores['boundary_quality'] = self._assess_boundary_quality(chunks)
|
|
|
|
# Calculate overall quality score
|
|
overall_score = sum(
|
|
score * self.quality_weights[metric]
|
|
for metric, score in scores.items()
|
|
)
|
|
|
|
scores['overall'] = overall_score
|
|
return scores
|
|
|
|
def _assess_coherence(self, chunks: List[Dict[str, Any]]) -> float:
|
|
# Simple heuristic-based coherence assessment
|
|
coherence_scores = []
|
|
|
|
for chunk in chunks:
|
|
content = chunk['content']
|
|
|
|
# Check for complete sentences
|
|
sentences = re.split(r'[.!?]+', content)
|
|
complete_sentences = sum(1 for s in sentences if s.strip())
|
|
coherence = complete_sentences / max(len(sentences), 1)
|
|
|
|
coherence_scores.append(coherence)
|
|
|
|
return np.mean(coherence_scores)
|
|
|
|
def _assess_completeness(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float:
|
|
# Check if important structural elements are preserved
|
|
if analysis.doc_type in ['markdown', 'html']:
|
|
return self._assess_structure_preservation(chunks, analysis)
|
|
else:
|
|
return self._assess_content_preservation(chunks)
|
|
|
|
def _assess_structure_preservation(self, chunks: List[Dict[str, Any]], analysis: DocumentAnalysis) -> float:
|
|
# Check if headings, lists, and other structural elements are preserved
|
|
preserved_elements = 0
|
|
total_elements = 0
|
|
|
|
for chunk in chunks:
|
|
content = chunk['content']
|
|
|
|
# Count preserved structural elements
|
|
headings = len(re.findall(r'^#+\s', content, re.MULTILINE))
|
|
lists = len(re.findall(r'^\s*[-*+]\s', content, re.MULTILINE))
|
|
|
|
preserved_elements += headings + lists
|
|
total_elements += 1 # Simplified count
|
|
|
|
return preserved_elements / max(total_elements, 1)
|
|
|
|
def _assess_content_preservation(self, chunks: List[Dict[str, Any]]) -> float:
|
|
# Simple check based on content ratio
|
|
total_content = ''.join(chunk['content'] for chunk in chunks)
|
|
# This would need comparison with original content
|
|
return 0.8 # Placeholder
|
|
|
|
def _assess_size(self, chunks: List[Dict[str, Any]]) -> float:
|
|
optimal_min = 100 # tokens
|
|
optimal_max = 1000 # tokens
|
|
|
|
size_scores = []
|
|
for chunk in chunks:
|
|
token_count = self._estimate_tokens(chunk['content'])
|
|
if optimal_min <= token_count <= optimal_max:
|
|
score = 1.0
|
|
elif token_count < optimal_min:
|
|
score = token_count / optimal_min
|
|
else:
|
|
score = max(0, 1 - (token_count - optimal_max) / optimal_max)
|
|
|
|
size_scores.append(score)
|
|
|
|
return np.mean(size_scores)
|
|
|
|
def _assess_semantic_consistency(self, chunks: List[Dict[str, Any]]) -> float:
|
|
# This would require embedding models for actual implementation
|
|
# Placeholder implementation
|
|
return 0.7
|
|
|
|
def _assess_boundary_quality(self, chunks: List[Dict[str, Any]]) -> float:
|
|
# Check if boundaries don't split important content
|
|
boundary_scores = []
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
content = chunk['content']
|
|
|
|
# Check for incomplete sentences at boundaries
|
|
if not content.strip().endswith(('.', '!', '?', '>', '}')):
|
|
boundary_scores.append(0.5)
|
|
else:
|
|
boundary_scores.append(1.0)
|
|
|
|
return np.mean(boundary_scores)
|
|
|
|
def _estimate_tokens(self, content: str) -> int:
|
|
# Simple token estimation
|
|
return len(content.split()) * 4 // 3 # Rough approximation
|
|
```
|
|
|
|
## Error Handling and Edge Cases
|
|
|
|
### Robust Error Handling
|
|
|
|
```python
|
|
import logging
|
|
from typing import Optional, List
|
|
from dataclasses import dataclass
|
|
|
|
@dataclass
|
|
class ChunkingError:
|
|
error_type: str
|
|
message: str
|
|
chunk_index: Optional[int] = None
|
|
recovery_action: Optional[str] = None
|
|
|
|
class ChunkingErrorHandler:
|
|
def __init__(self):
|
|
self.logger = logging.getLogger(__name__)
|
|
self.error_handlers = {
|
|
'empty_content': self._handle_empty_content,
|
|
'oversized_chunk': self._handle_oversized_chunk,
|
|
'encoding_error': self._handle_encoding_error,
|
|
'memory_error': self._handle_memory_error,
|
|
'structure_parsing_error': self._handle_structure_parsing_error
|
|
}
|
|
|
|
def handle_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
|
|
error_type = self._classify_error(error)
|
|
handler = self.error_handlers.get(error_type, self._handle_generic_error)
|
|
return handler(error, context)
|
|
|
|
def _classify_error(self, error: Exception) -> str:
|
|
if isinstance(error, ValueError) and 'empty' in str(error).lower():
|
|
return 'empty_content'
|
|
elif isinstance(error, MemoryError):
|
|
return 'memory_error'
|
|
elif isinstance(error, UnicodeError):
|
|
return 'encoding_error'
|
|
elif 'too large' in str(error).lower():
|
|
return 'oversized_chunk'
|
|
elif 'parsing' in str(error).lower():
|
|
return 'structure_parsing_error'
|
|
else:
|
|
return 'generic_error'
|
|
|
|
def _handle_empty_content(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
|
|
self.logger.warning(f"Empty content encountered: {error}")
|
|
return ChunkingError(
|
|
error_type='empty_content',
|
|
message=str(error),
|
|
recovery_action='skip_empty_content'
|
|
)
|
|
|
|
def _handle_oversized_chunk(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
|
|
self.logger.warning(f"Oversized chunk detected: {error}")
|
|
return ChunkingError(
|
|
error_type='oversized_chunk',
|
|
message=str(error),
|
|
chunk_index=context.get('chunk_index'),
|
|
recovery_action='reduce_chunk_size'
|
|
)
|
|
|
|
def _handle_encoding_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
|
|
self.logger.error(f"Encoding error: {error}")
|
|
return ChunkingError(
|
|
error_type='encoding_error',
|
|
message=str(error),
|
|
recovery_action='fallback_encoding'
|
|
)
|
|
|
|
def _handle_memory_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
|
|
self.logger.error(f"Memory error during chunking: {error}")
|
|
return ChunkingError(
|
|
error_type='memory_error',
|
|
message=str(error),
|
|
recovery_action='process_in_batches'
|
|
)
|
|
|
|
def _handle_structure_parsing_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
|
|
self.logger.warning(f"Structure parsing failed: {error}")
|
|
return ChunkingError(
|
|
error_type='structure_parsing_error',
|
|
message=str(error),
|
|
recovery_action='fallback_to_fixed_size'
|
|
)
|
|
|
|
def _handle_generic_error(self, error: Exception, context: Dict[str, Any]) -> ChunkingError:
|
|
self.logger.error(f"Unexpected error during chunking: {error}")
|
|
return ChunkingError(
|
|
error_type='generic_error',
|
|
message=str(error),
|
|
recovery_action='skip_and_continue'
|
|
)
|
|
```
|
|
|
|
## Performance Optimization
|
|
|
|
### Caching and Memoization
|
|
|
|
```python
|
|
import hashlib
|
|
import pickle
|
|
from functools import lru_cache
|
|
from typing import Dict, Any, Optional
|
|
import redis
|
|
import json
|
|
|
|
class ChunkingCache:
|
|
def __init__(self, redis_url: Optional[str] = None):
|
|
if redis_url:
|
|
self.redis_client = redis.from_url(redis_url)
|
|
else:
|
|
self.redis_client = None
|
|
self.local_cache = {}
|
|
|
|
def _generate_cache_key(self, content: str, strategy: str, params: Dict[str, Any]) -> str:
|
|
content_hash = hashlib.md5(content.encode()).hexdigest()
|
|
params_str = json.dumps(params, sort_keys=True)
|
|
params_hash = hashlib.md5(params_str.encode()).hexdigest()
|
|
return f"chunking:{strategy}:{content_hash}:{params_hash}"
|
|
|
|
def get(self, content: str, strategy: str, params: Dict[str, Any]) -> Optional[List[Dict[str, Any]]]:
|
|
cache_key = self._generate_cache_key(content, strategy, params)
|
|
|
|
# Try local cache first
|
|
if cache_key in self.local_cache:
|
|
return self.local_cache[cache_key]
|
|
|
|
# Try Redis cache
|
|
if self.redis_client:
|
|
try:
|
|
cached_data = self.redis_client.get(cache_key)
|
|
if cached_data:
|
|
chunks = pickle.loads(cached_data)
|
|
self.local_cache[cache_key] = chunks # Cache locally too
|
|
return chunks
|
|
except Exception as e:
|
|
logging.warning(f"Redis cache error: {e}")
|
|
|
|
return None
|
|
|
|
def set(self, content: str, strategy: str, params: Dict[str, Any], chunks: List[Dict[str, Any]]) -> None:
|
|
cache_key = self._generate_cache_key(content, strategy, params)
|
|
|
|
# Store in local cache
|
|
self.local_cache[cache_key] = chunks
|
|
|
|
# Store in Redis cache
|
|
if self.redis_client:
|
|
try:
|
|
cached_data = pickle.dumps(chunks)
|
|
self.redis_client.setex(cache_key, 3600, cached_data) # 1 hour TTL
|
|
except Exception as e:
|
|
logging.warning(f"Redis cache set error: {e}")
|
|
|
|
def clear_local_cache(self):
|
|
self.local_cache.clear()
|
|
|
|
def clear_redis_cache(self):
|
|
if self.redis_client:
|
|
pattern = "chunking:*"
|
|
keys = self.redis_client.keys(pattern)
|
|
if keys:
|
|
self.redis_client.delete(*keys)
|
|
```
|
|
|
|
### Batch Processing
|
|
|
|
```python
|
|
import asyncio
|
|
import concurrent.futures
|
|
from typing import List, Callable, Any
|
|
|
|
class BatchChunkingProcessor:
|
|
def __init__(self, max_workers: int = 4, batch_size: int = 10):
|
|
self.max_workers = max_workers
|
|
self.batch_size = batch_size
|
|
|
|
def process_documents_batch(self, documents: List[str],
|
|
chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]:
|
|
"""Process multiple documents in parallel"""
|
|
results = []
|
|
|
|
# Process in batches to avoid memory issues
|
|
for i in range(0, len(documents), self.batch_size):
|
|
batch = documents[i:i + self.batch_size]
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
|
future_to_doc = {
|
|
executor.submit(chunking_function, doc): doc
|
|
for doc in batch
|
|
}
|
|
|
|
batch_results = []
|
|
for future in concurrent.futures.as_completed(future_to_doc):
|
|
try:
|
|
chunks = future.result()
|
|
batch_results.append(chunks)
|
|
except Exception as e:
|
|
logging.error(f"Error processing document: {e}")
|
|
batch_results.append([]) # Empty result for failed processing
|
|
|
|
results.extend(batch_results)
|
|
|
|
return results
|
|
|
|
async def process_documents_async(self, documents: List[str],
|
|
chunking_function: Callable[[str], List[Dict[str, Any]]]) -> List[List[Dict[str, Any]]]:
|
|
"""Process documents asynchronously"""
|
|
semaphore = asyncio.Semaphore(self.max_workers)
|
|
|
|
async def process_single_document(doc: str) -> List[Dict[str, Any]]:
|
|
async with semaphore:
|
|
# Run the synchronous chunking function in an executor
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, chunking_function, doc)
|
|
|
|
tasks = [process_single_document(doc) for doc in documents]
|
|
return await asyncio.gather(*tasks, return_exceptions=True)
|
|
```
|
|
|
|
## Monitoring and Observability
|
|
|
|
### Metrics Collection
|
|
|
|
```python
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Any, List
|
|
from collections import defaultdict
|
|
|
|
@dataclass
|
|
class ChunkingMetrics:
|
|
total_documents: int
|
|
total_chunks: int
|
|
avg_chunk_size: float
|
|
processing_time: float
|
|
memory_usage: float
|
|
error_count: int
|
|
strategy_distribution: Dict[str, int]
|
|
|
|
class MetricsCollector:
|
|
def __init__(self):
|
|
self.metrics = defaultdict(list)
|
|
self.start_time = None
|
|
|
|
def start_timing(self):
|
|
self.start_time = time.time()
|
|
|
|
def end_timing(self) -> float:
|
|
if self.start_time:
|
|
duration = time.time() - self.start_time
|
|
self.metrics['processing_time'].append(duration)
|
|
self.start_time = None
|
|
return duration
|
|
return 0.0
|
|
|
|
def record_chunk_count(self, count: int):
|
|
self.metrics['chunk_count'].append(count)
|
|
|
|
def record_chunk_size(self, size: int):
|
|
self.metrics['chunk_size'].append(size)
|
|
|
|
def record_strategy_usage(self, strategy: str):
|
|
self.metrics['strategy'][strategy] = self.metrics['strategy'].get(strategy, 0) + 1
|
|
|
|
def record_error(self, error_type: str):
|
|
self.metrics['errors'].append(error_type)
|
|
|
|
def record_memory_usage(self, memory_mb: float):
|
|
self.metrics['memory_usage'].append(memory_mb)
|
|
|
|
def get_summary(self) -> ChunkingMetrics:
|
|
return ChunkingMetrics(
|
|
total_documents=len(self.metrics['processing_time']),
|
|
total_chunks=sum(self.metrics['chunk_count']),
|
|
avg_chunk_size=sum(self.metrics['chunk_size']) / max(len(self.metrics['chunk_size']), 1),
|
|
processing_time=sum(self.metrics['processing_time']),
|
|
memory_usage=sum(self.metrics['memory_usage']) / max(len(self.metrics['memory_usage']), 1),
|
|
error_count=len(self.metrics['errors']),
|
|
strategy_distribution=dict(self.metrics['strategy'])
|
|
)
|
|
|
|
def reset(self):
|
|
self.metrics.clear()
|
|
self.start_time = None
|
|
```
|
|
|
|
This implementation guide provides a comprehensive foundation for building robust, scalable chunking systems that can handle various document types and use cases while maintaining high quality and performance. |