# LLM Inference Optimization Skill ## When to Use This Skill Use this skill when: - Building production LLM applications with latency requirements - Processing large batches of requests (classification, summarization, extraction) - Optimizing cost for high-volume applications - Improving throughput for batch processing - Enhancing user experience with streaming - Balancing cost, latency, and quality trade-offs **When NOT to use:** Prototyping or single-query experiments where optimization is premature. ## Core Principle **Performance is not automatic. Optimization is systematic.** Without optimization: - Sequential processing: 16 minutes for 1000 documents (0.06 requests/sec) - No caching: 60% wasted cost on repeated queries - Wrong model: 10× expensive for same quality - No streaming: 40% bounce rate on long generations - Single-objective: Poor cost-latency-quality trade-offs **Formula:** Parallelization (10× throughput) + Caching (60% cost savings) + Model routing (balanced cost-quality) + Streaming (better UX) + Multi-objective optimization (Pareto optimal) = Production-ready performance. ## Optimization Framework ``` ┌─────────────────────────────────────────┐ │ 1. Measure Baseline │ │ Latency, Cost, Quality, Throughput │ └──────────────┬──────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 2. Set Requirements │ │ Acceptable latency, Budget, Quality │ └──────────────┬──────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 3. Apply Optimizations │ │ Parallelization → Caching → Routing │ └──────────────┬──────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 4. Evaluate Trade-offs │ │ Cost vs Latency vs Quality (Pareto) │ └──────────────┬──────────────────────────┘ │ ▼ ┌─────────────────────────────────────────┐ │ 5. Monitor Production │ │ Track metrics, Detect regressions │ └─────────────────────────────────────────┘ ``` ## Part 1: Parallelization ### Async/Await for Concurrent Requests **Problem:** Sequential API calls are slow (1 request/sec). **Solution:** Concurrent requests with async/await (10-20 requests/sec). ```python import asyncio import openai from typing import List async def classify_async(text: str, semaphore: asyncio.Semaphore) -> str: """ Classify text asynchronously with rate limiting. Args: text: Text to classify semaphore: Limits concurrent requests Returns: Classification result """ async with semaphore: response = await openai.ChatCompletion.acreate( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "Classify sentiment: positive/negative/neutral"}, {"role": "user", "content": text} ] ) return response.choices[0].message.content async def classify_batch_parallel( texts: List[str], concurrency: int = 10 ) -> List[str]: """ Classify multiple texts in parallel. Args: texts: List of texts to classify concurrency: Maximum concurrent requests (default 10) Returns: List of classification results """ semaphore = asyncio.Semaphore(concurrency) tasks = [classify_async(text, semaphore) for text in texts] results = await asyncio.gather(*tasks) return results # Example usage texts = ["Great product!", "Terrible service.", "It's okay."] * 333 # 1000 texts # Sequential: 1000 requests × 1 second = 1000 seconds (16.7 minutes) # Parallel (concurrency=10): 1000 requests / 10 = 100 seconds (1.7 minutes) - 10× FASTER! results = asyncio.run(classify_batch_parallel(texts, concurrency=10)) print(f"Classified {len(results)} texts") ``` **Performance comparison:** | Approach | Time | Throughput | Cost | |----------|------|------------|------| | Sequential | 1000s (16.7 min) | 1 req/sec | $2.00 | | Parallel (10) | 100s (1.7 min) | 10 req/sec | $2.00 (same!) | | Parallel (20) | 50s (0.8 min) | 20 req/sec | $2.00 (same!) | **Key insight:** Parallelization is **free performance**. Same cost, 10-20× faster. ### OpenAI Batch API (Offline Processing) **Problem:** Real-time API is expensive for large batch jobs. **Solution:** Batch API (50% cheaper, 24-hour completion window). ```python import openai import jsonlines import time def create_batch_job(texts: List[str], output_file: str = "batch_results.jsonl"): """ Submit batch job for offline processing (50% cost reduction). Args: texts: List of texts to process output_file: File to save results Returns: Batch job ID """ # Step 1: Create batch input file (JSONL format) batch_input = [] for i, text in enumerate(texts): batch_input.append({ "custom_id": f"request-{i}", "method": "POST", "url": "/v1/chat/completions", "body": { "model": "gpt-3.5-turbo", "messages": [ {"role": "system", "content": "Classify sentiment: positive/negative/neutral"}, {"role": "user", "content": text} ] } }) # Write to file with jsonlines.open("batch_input.jsonl", "w") as writer: writer.write_all(batch_input) # Step 2: Upload file with open("batch_input.jsonl", "rb") as f: file_response = openai.File.create(file=f, purpose="batch") # Step 3: Create batch job batch_job = openai.Batch.create( input_file_id=file_response.id, endpoint="/v1/chat/completions", completion_window="24h" # Complete within 24 hours ) print(f"Batch job created: {batch_job.id}") print(f"Status: {batch_job.status}") return batch_job.id def check_batch_status(batch_id: str): """Check batch job status.""" batch = openai.Batch.retrieve(batch_id) print(f"Status: {batch.status}") print(f"Completed: {batch.request_counts.completed}/{batch.request_counts.total}") if batch.status == "completed": # Download results result_file_id = batch.output_file_id result = openai.File.download(result_file_id) with open("batch_results.jsonl", "wb") as f: f.write(result) print(f"Results saved to batch_results.jsonl") return batch.status # Example usage texts = ["Great product!"] * 10000 # 10,000 texts # Submit batch job batch_id = create_batch_job(texts) # Check status (poll every 10 minutes) while True: status = check_batch_status(batch_id) if status == "completed": break time.sleep(600) # Check every 10 minutes # Cost: $10 (batch API) vs $20 (real-time API) = 50% savings! ``` **When to use Batch API:** | Use Case | Real-time API | Batch API | |----------|--------------|-----------| | User-facing chat | ✓ (latency critical) | ✗ | | Document classification (10k docs) | ✗ (expensive) | ✓ (50% cheaper) | | Nightly data processing | ✗ | ✓ | | A/B test evaluation | ✗ | ✓ | | Real-time search | ✓ | ✗ | ## Part 2: Caching ### Answer Caching (Repeated Queries) **Problem:** 60-70% of queries are repeated (FAQs, common questions). **Solution:** Cache answers for identical queries (60% cost reduction). ```python import hashlib import json from typing import Optional class AnswerCache: def __init__(self): self.cache = {} # In-memory cache (use Redis for production) def _cache_key(self, query: str, model: str = "gpt-3.5-turbo") -> str: """Generate cache key from query and model.""" # Normalize query (lowercase, strip whitespace) normalized = query.lower().strip() # Hash for consistent key key_data = f"{model}:{normalized}" return hashlib.md5(key_data.encode()).hexdigest() def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]: """Get cached answer if exists.""" key = self._cache_key(query, model) return self.cache.get(key) def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"): """Cache answer for query.""" key = self._cache_key(query, model) self.cache[key] = answer def stats(self): """Get cache statistics.""" return { "cache_size": len(self.cache), "memory_bytes": sum(len(v.encode()) for v in self.cache.values()) } def answer_with_cache( query: str, cache: AnswerCache, model: str = "gpt-3.5-turbo" ) -> tuple[str, bool]: """ Answer query with caching. Returns: (answer, cache_hit) """ # Check cache cached_answer = cache.get(query, model) if cached_answer: return cached_answer, True # Cache hit! # Cache miss: Generate answer response = openai.ChatCompletion.create( model=model, messages=[ {"role": "system", "content": "Answer the question concisely."}, {"role": "user", "content": query} ] ) answer = response.choices[0].message.content # Cache for future queries cache.set(query, answer, model) return answer, False # Example usage cache = AnswerCache() queries = [ "What is your return policy?", "How do I track my order?", "What is your return policy?", # Repeated! "Do you offer international shipping?", "What is your return policy?", # Repeated again! ] cache_hits = 0 cache_misses = 0 for query in queries: answer, is_cache_hit = answer_with_cache(query, cache) if is_cache_hit: cache_hits += 1 print(f"[CACHE HIT] {query}") else: cache_misses += 1 print(f"[CACHE MISS] {query}") print(f"Answer: {answer}\n") print(f"Cache hits: {cache_hits}/{len(queries)} ({cache_hits/len(queries)*100:.1f}%)") print(f"Cost savings: {cache_hits/len(queries)*100:.1f}%") # Output: # [CACHE MISS] What is your return policy? # [CACHE MISS] How do I track my order? # [CACHE HIT] What is your return policy? # [CACHE MISS] Do you offer international shipping? # [CACHE HIT] What is your return policy? # Cache hits: 2/5 (40%) # Cost savings: 40% ``` **Production caching with Redis:** ```python import redis import json class RedisAnswerCache: def __init__(self, redis_url: str = "redis://localhost:6379"): self.redis_client = redis.from_url(redis_url) self.ttl = 86400 # 24 hours def _cache_key(self, query: str, model: str) -> str: normalized = query.lower().strip() return f"answer:{model}:{hashlib.md5(normalized.encode()).hexdigest()}" def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]: key = self._cache_key(query, model) cached = self.redis_client.get(key) return cached.decode() if cached else None def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"): key = self._cache_key(query, model) self.redis_client.setex(key, self.ttl, answer) def stats(self): return { "cache_size": self.redis_client.dbsize(), "memory_usage": self.redis_client.info("memory")["used_memory_human"] } ``` ### Prompt Caching (Static Context) **Problem:** RAG sends same context repeatedly (expensive). **Solution:** Anthropic prompt caching (90% cost reduction for static context). ```python import anthropic def rag_with_prompt_caching( query: str, context: str, # Static context (knowledge base) model: str = "claude-3-sonnet-20240229" ): """ RAG with prompt caching for static context. First query: Full cost (e.g., $0.01) Subsequent queries: 90% discount on cached context (e.g., $0.001) """ client = anthropic.Anthropic() response = client.messages.create( model=model, max_tokens=500, system=[ { "type": "text", "text": "Answer questions using only the provided context.", }, { "type": "text", "text": f"Context:\n{context}", "cache_control": {"type": "ephemeral"} # Cache this! } ], messages=[ {"role": "user", "content": query} ] ) return response.content[0].text # Example knowledge_base = """ [Large knowledge base with 50,000 tokens of product info, policies, FAQs...] """ # Query 1: Full cost (write context to cache) answer1 = rag_with_prompt_caching("What is your return policy?", knowledge_base) # Cost: Input (50k tokens × $0.003/1k) + Cache write (50k × $0.00375/1k) = $0.34 # Query 2-100: 90% discount on cached context! answer2 = rag_with_prompt_caching("How do I track my order?", knowledge_base) # Cost: Cached input (50k × $0.0003/1k) + Query (20 tokens × $0.003/1k) = $0.015 + $0.00006 = $0.015 # Savings: Query 2-100 cost $0.015 vs $0.34 = 95.6% reduction per query! ``` **When prompt caching is effective:** | Scenario | Static Context | Dynamic Content | Cache Savings | |----------|----------------|-----------------|---------------| | RAG with knowledge base | 50k tokens (policies, products) | Query (20 tokens) | 95%+ | | Multi-turn chat with instructions | 1k tokens (system message) | Conversation (varying) | 60-80% | | Document analysis | 10k tokens (document) | Multiple questions | 90%+ | | Code review with context | 5k tokens (codebase) | Review comments | 85%+ | ## Part 3: Model Routing ### Task-Based Model Selection **Problem:** Using GPT-4 for everything is 10× expensive. **Solution:** Route by task complexity (GPT-3.5 for simple, GPT-4 for complex). ```python from enum import Enum from typing import Dict class TaskType(Enum): CLASSIFICATION = "classification" EXTRACTION = "extraction" SUMMARIZATION = "summarization" TRANSLATION = "translation" REASONING = "reasoning" CREATIVE = "creative" CODE_GENERATION = "code_generation" class ModelRouter: """Route queries to appropriate model based on task complexity.""" # Model configurations MODELS = { "gpt-3.5-turbo": { "cost_per_1k_input": 0.0015, "cost_per_1k_output": 0.002, "latency_factor": 1.0, # Baseline "quality_score": 0.85 }, "gpt-4": { "cost_per_1k_input": 0.03, "cost_per_1k_output": 0.06, "latency_factor": 2.5, "quality_score": 0.95 }, "gpt-4-turbo": { "cost_per_1k_input": 0.01, "cost_per_1k_output": 0.03, "latency_factor": 1.5, "quality_score": 0.94 } } # Task → Model mapping TASK_ROUTING = { TaskType.CLASSIFICATION: "gpt-3.5-turbo", # Simple task TaskType.EXTRACTION: "gpt-3.5-turbo", TaskType.SUMMARIZATION: "gpt-3.5-turbo", TaskType.TRANSLATION: "gpt-3.5-turbo", TaskType.REASONING: "gpt-4", # Complex reasoning TaskType.CREATIVE: "gpt-4", # Better creativity TaskType.CODE_GENERATION: "gpt-4" # Better coding } @classmethod def route(cls, task_type: TaskType, complexity: str = "medium") -> str: """ Route to appropriate model. Args: task_type: Type of task complexity: "low", "medium", "high" Returns: Model name """ base_model = cls.TASK_ROUTING[task_type] # Override for high complexity if complexity == "high" and base_model == "gpt-3.5-turbo": return "gpt-4-turbo" # Upgrade for complex variants return base_model @classmethod def calculate_cost(cls, model: str, input_tokens: int, output_tokens: int) -> float: """Calculate cost for model.""" config = cls.MODELS[model] input_cost = (input_tokens / 1000) * config["cost_per_1k_input"] output_cost = (output_tokens / 1000) * config["cost_per_1k_output"] return input_cost + output_cost @classmethod def compare_models(cls, task_type: TaskType, input_tokens: int = 500, output_tokens: int = 200): """Compare models for a task.""" print(f"\nTask: {task_type.value}") print(f"Input: {input_tokens} tokens, Output: {output_tokens} tokens\n") for model_name, config in cls.MODELS.items(): cost = cls.calculate_cost(model_name, input_tokens, output_tokens) quality = config["quality_score"] latency = config["latency_factor"] print(f"{model_name}:") print(f" Cost: ${cost:.4f}") print(f" Quality: {quality:.0%}") print(f" Latency: {latency:.1f}× baseline") print(f" Cost per quality point: ${cost/quality:.4f}\n") # Example usage router = ModelRouter() # Classification task model = router.route(TaskType.CLASSIFICATION, complexity="low") print(f"Classification → {model}") # gpt-3.5-turbo # Complex reasoning task model = router.route(TaskType.REASONING, complexity="high") print(f"Complex reasoning → {model}") # gpt-4 # Compare costs router.compare_models(TaskType.CLASSIFICATION, input_tokens=500, output_tokens=200) # Output: # gpt-3.5-turbo: $0.0015 (Cost per quality: $0.0018) # gpt-4: $0.0270 (Cost per quality: $0.0284) - 18× more expensive! # Recommendation: Use GPT-3.5 for classification (18× cheaper, acceptable quality) ``` ### Model Cascade (Try Cheap First) **Problem:** Don't know if task needs GPT-4 until you try. **Solution:** Try GPT-3.5, escalate to GPT-4 if unsatisfied. ```python def cascade_generation( prompt: str, quality_threshold: float = 0.8, max_attempts: int = 2 ) -> tuple[str, str, float]: """ Try cheaper model first, escalate if quality insufficient. Args: prompt: User prompt quality_threshold: Minimum quality score (0-1) max_attempts: Max escalation attempts Returns: (response, model_used, estimated_quality) """ models = ["gpt-3.5-turbo", "gpt-4-turbo", "gpt-4"] for i, model in enumerate(models[:max_attempts]): response = openai.ChatCompletion.create( model=model, messages=[{"role": "user", "content": prompt}] ) result = response.choices[0].message.content # Estimate quality (simplified - use LLM-as-judge in production) quality = estimate_quality(result, prompt) if quality >= quality_threshold: print(f"✓ {model} met quality threshold ({quality:.2f} >= {quality_threshold})") return result, model, quality else: print(f"✗ {model} below threshold ({quality:.2f} < {quality_threshold}), escalating...") # Return best attempt even if below threshold return result, models[max_attempts-1], quality def estimate_quality(response: str, prompt: str) -> float: """ Estimate quality score (0-1). Production: Use LLM-as-judge or other quality metrics. """ # Simplified heuristic if len(response) < 20: return 0.3 # Too short elif len(response) > 500: return 0.9 # Detailed else: return 0.7 # Moderate # Example prompt = "Explain quantum entanglement in simple terms." result, model, quality = cascade_generation(prompt, quality_threshold=0.8) print(f"\nFinal result:") print(f"Model: {model}") print(f"Quality: {quality:.2f}") print(f"Response: {result[:200]}...") # Average case: GPT-3.5 suffices (90% of queries) # Cost: $0.002 per query # Complex case: Escalate to GPT-4 (10% of queries) # Cost: $0.002 (GPT-3.5 attempt) + $0.030 (GPT-4) = $0.032 # Overall cost: 0.9 × $0.002 + 0.1 × $0.032 = $0.0018 + $0.0032 = $0.005 # vs Always GPT-4: $0.030 # Savings: 83%! ``` ## Part 4: Streaming ### Streaming for Long-Form Generation **Problem:** 20-second wait for full article (40% bounce rate). **Solution:** Stream tokens as generated (perceived latency: 0.5s). ```python import openai def generate_streaming(prompt: str, model: str = "gpt-4"): """ Generate response with streaming. Benefits: - First token in 0.5s (vs 20s wait) - User sees progress (engagement) - Can cancel early if needed """ response = openai.ChatCompletion.create( model=model, messages=[{"role": "user", "content": prompt}], max_tokens=2000, stream=True # Enable streaming ) full_response = "" for chunk in response: if chunk.choices[0].delta.get("content"): token = chunk.choices[0].delta.content full_response += token print(token, end="", flush=True) # Display immediately print() # Newline return full_response # Example prompt = "Write a detailed article about the history of artificial intelligence." # Without streaming: Wait 20s, then see full article # With streaming: See first words in 0.5s, smooth streaming for 20s article = generate_streaming(prompt) # User experience improvement: # - Perceived latency: 20s → 0.5s (40× better!) # - Bounce rate: 40% → 5% (35pp improvement!) # - Satisfaction: 3.2/5 → 4.3/5 (+1.1 points!) ``` ### Streaming in Web Applications **Flask with Server-Sent Events (SSE):** ```python from flask import Flask, Response, request import openai app = Flask(__name__) @app.route('/generate', methods=['POST']) def generate_stream(): """Stream generation results to frontend.""" prompt = request.json.get('prompt') def event_stream(): """Generator for SSE.""" response = openai.ChatCompletion.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], stream=True ) for chunk in response: if chunk.choices[0].delta.get("content"): token = chunk.choices[0].delta.content # SSE format: "data: {content}\n\n" yield f"data: {token}\n\n" # Signal completion yield "data: [DONE]\n\n" return Response(event_stream(), mimetype="text/event-stream") # Frontend (JavaScript): """ const eventSource = new EventSource('/generate', { method: 'POST', body: JSON.stringify({prompt: userPrompt}) }); eventSource.onmessage = (event) => { if (event.data === '[DONE]') { eventSource.close(); } else { // Append token to display document.getElementById('output').innerText += event.data; } }; """ ``` ## Part 5: Cost-Latency-Quality Trade-offs ### Multi-Objective Optimization **Problem:** Optimizing single objective (cost OR latency) leads to poor trade-offs. **Solution:** Pareto analysis to find balanced solutions. ```python import numpy as np from typing import List, Dict class OptimizationOption: def __init__( self, name: str, latency_p95: float, # seconds cost_per_1k: float, # dollars quality_score: float # 0-1 ): self.name = name self.latency_p95 = latency_p95 self.cost_per_1k = cost_per_1k self.quality_score = quality_score def dominates(self, other: 'OptimizationOption') -> bool: """Check if this option dominates another (Pareto dominance).""" # Dominate if: better or equal in all dimensions, strictly better in at least one better_latency = self.latency_p95 <= other.latency_p95 better_cost = self.cost_per_1k <= other.cost_per_1k better_quality = self.quality_score >= other.quality_score strictly_better = ( self.latency_p95 < other.latency_p95 or self.cost_per_1k < other.cost_per_1k or self.quality_score > other.quality_score ) return better_latency and better_cost and better_quality and strictly_better def __repr__(self): return f"{self.name}: {self.latency_p95:.2f}s, ${self.cost_per_1k:.3f}/1k, {self.quality_score:.2f} quality" def find_pareto_optimal(options: List[OptimizationOption]) -> List[OptimizationOption]: """Find Pareto optimal solutions (non-dominated options).""" pareto_optimal = [] for option in options: is_dominated = False for other in options: if other.dominates(option): is_dominated = True break if not is_dominated: pareto_optimal.append(option) return pareto_optimal # Example: RAG chatbot optimization options = [ OptimizationOption("GPT-4, no caching", latency_p95=2.5, cost_per_1k=10.0, quality_score=0.92), OptimizationOption("GPT-3.5, no caching", latency_p95=0.8, cost_per_1k=2.0, quality_score=0.78), OptimizationOption("GPT-3.5 + caching", latency_p95=0.6, cost_per_1k=1.2, quality_score=0.78), OptimizationOption("GPT-3.5 + caching + prompt eng", latency_p95=0.7, cost_per_1k=1.3, quality_score=0.85), OptimizationOption("GPT-4 + caching", latency_p95=2.0, cost_per_1k=6.0, quality_score=0.92), OptimizationOption("GPT-4-turbo + caching", latency_p95=1.2, cost_per_1k=4.0, quality_score=0.90), ] # Find Pareto optimal pareto = find_pareto_optimal(options) print("Pareto Optimal Solutions:") for opt in pareto: print(f" {opt}") # Output: # Pareto Optimal Solutions: # GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality # GPT-4-turbo + caching: 1.20s, $4.000/1k, 0.90 quality # GPT-4 + caching: 2.00s, $6.000/1k, 0.92 quality # Interpretation: # - If budget-conscious: GPT-3.5 + caching + prompt eng ($1.30/1k, 0.85 quality) # - If quality-critical: GPT-4-turbo + caching ($4/1k, 0.90 quality, faster than GPT-4) # - If maximum quality needed: GPT-4 + caching ($6/1k, 0.92 quality) ``` ### Requirements-Based Selection ```python def select_optimal_solution( options: List[OptimizationOption], max_latency: float = None, max_cost: float = None, min_quality: float = None ) -> OptimizationOption: """ Select optimal solution given constraints. Args: options: Available options max_latency: Maximum acceptable latency (seconds) max_cost: Maximum cost per 1k queries (dollars) min_quality: Minimum quality score (0-1) Returns: Best option meeting all constraints """ # Filter options meeting constraints feasible = [] for opt in options: meets_latency = max_latency is None or opt.latency_p95 <= max_latency meets_cost = max_cost is None or opt.cost_per_1k <= max_cost meets_quality = min_quality is None or opt.quality_score >= min_quality if meets_latency and meets_cost and meets_quality: feasible.append(opt) if not feasible: raise ValueError("No solution meets all constraints") # Among feasible, select best cost-quality trade-off best = min(feasible, key=lambda opt: opt.cost_per_1k / opt.quality_score) return best # Example: Requirements requirements = { "max_latency": 1.0, # Must respond within 1 second "max_cost": 5.0, # Budget: $5 per 1k queries "min_quality": 0.85 # Minimum 85% quality } selected = select_optimal_solution( options, max_latency=requirements["max_latency"], max_cost=requirements["max_cost"], min_quality=requirements["min_quality"] ) print(f"Selected solution: {selected}") # Output: GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality # (Meets all constraints, most cost-effective) ``` ## Part 6: Production Monitoring ### Performance Metrics Tracking ```python import time from dataclasses import dataclass from typing import List import numpy as np @dataclass class QueryMetrics: """Metrics for a single query.""" latency_ms: float input_tokens: int output_tokens: int cost: float cache_hit: bool model: str class PerformanceMonitor: """Track and analyze performance metrics.""" def __init__(self): self.metrics: List[QueryMetrics] = [] def log_query( self, latency_ms: float, input_tokens: int, output_tokens: int, cost: float, cache_hit: bool, model: str ): """Log query metrics.""" self.metrics.append(QueryMetrics( latency_ms=latency_ms, input_tokens=input_tokens, output_tokens=output_tokens, cost=cost, cache_hit=cache_hit, model=model )) def summary(self) -> Dict: """Generate summary statistics.""" if not self.metrics: return {} latencies = [m.latency_ms for m in self.metrics] costs = [m.cost for m in self.metrics] cache_hits = [m.cache_hit for m in self.metrics] return { "total_queries": len(self.metrics), "latency_p50": np.percentile(latencies, 50), "latency_p95": np.percentile(latencies, 95), "latency_p99": np.percentile(latencies, 99), "avg_cost": np.mean(costs), "total_cost": np.sum(costs), "cache_hit_rate": np.mean(cache_hits) * 100, "queries_per_model": self._count_by_model() } def _count_by_model(self) -> Dict[str, int]: """Count queries by model.""" counts = {} for m in self.metrics: counts[m.model] = counts.get(m.model, 0) + 1 return counts # Example usage monitor = PerformanceMonitor() # Simulate queries for i in range(1000): cache_hit = np.random.random() < 0.6 # 60% cache hit rate latency = 100 if cache_hit else 800 # Cache: 100ms, API: 800ms cost = 0 if cache_hit else 0.002 monitor.log_query( latency_ms=latency, input_tokens=500, output_tokens=200, cost=cost, cache_hit=cache_hit, model="gpt-3.5-turbo" ) # Generate summary summary = monitor.summary() print("Performance Summary:") print(f" Total queries: {summary['total_queries']}") print(f" Latency P50: {summary['latency_p50']:.0f}ms") print(f" Latency P95: {summary['latency_p95']:.0f}ms") print(f" Avg cost: ${summary['avg_cost']:.4f}") print(f" Total cost: ${summary['total_cost']:.2f}") print(f" Cache hit rate: {summary['cache_hit_rate']:.1f}%") ``` ## Summary **Inference optimization is systematic, not ad-hoc.** **Core techniques:** 1. **Parallelization:** Async/await (10× throughput), Batch API (50% cheaper) 2. **Caching:** Answer caching (60% savings), Prompt caching (90% savings) 3. **Model routing:** GPT-3.5 for simple tasks (10× cheaper), GPT-4 for complex 4. **Streaming:** First token in 0.5s (vs 20s wait), 35pp better completion rate 5. **Multi-objective:** Pareto analysis (balance cost-latency-quality) **Checklist:** 1. ✓ Measure baseline (latency, cost, quality) 2. ✓ Set requirements (acceptable latency, budget, quality threshold) 3. ✓ Parallelize batch processing (10× throughput) 4. ✓ Implement caching (60-90% cost savings) 5. ✓ Route by task complexity (10× cost savings) 6. ✓ Stream long responses (better UX) 7. ✓ Analyze cost-latency-quality trade-offs (Pareto optimal) 8. ✓ Monitor production metrics (track improvements) Production-ready performance requires deliberate optimization across multiple dimensions.