Files
gh-tachyon-beep-skillpacks-…/skills/using-llm-specialist/llm-inference-optimization.md
2025-11-30 08:59:54 +08:00

1033 lines
32 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# LLM Inference Optimization Skill
## When to Use This Skill
Use this skill when:
- Building production LLM applications with latency requirements
- Processing large batches of requests (classification, summarization, extraction)
- Optimizing cost for high-volume applications
- Improving throughput for batch processing
- Enhancing user experience with streaming
- Balancing cost, latency, and quality trade-offs
**When NOT to use:** Prototyping or single-query experiments where optimization is premature.
## Core Principle
**Performance is not automatic. Optimization is systematic.**
Without optimization:
- Sequential processing: 16 minutes for 1000 documents (0.06 requests/sec)
- No caching: 60% wasted cost on repeated queries
- Wrong model: 10× expensive for same quality
- No streaming: 40% bounce rate on long generations
- Single-objective: Poor cost-latency-quality trade-offs
**Formula:** Parallelization (10× throughput) + Caching (60% cost savings) + Model routing (balanced cost-quality) + Streaming (better UX) + Multi-objective optimization (Pareto optimal) = Production-ready performance.
## Optimization Framework
```
┌─────────────────────────────────────────┐
│ 1. Measure Baseline │
│ Latency, Cost, Quality, Throughput │
└──────────────┬──────────────────────────┘
┌─────────────────────────────────────────┐
│ 2. Set Requirements │
│ Acceptable latency, Budget, Quality │
└──────────────┬──────────────────────────┘
┌─────────────────────────────────────────┐
│ 3. Apply Optimizations │
│ Parallelization → Caching → Routing │
└──────────────┬──────────────────────────┘
┌─────────────────────────────────────────┐
│ 4. Evaluate Trade-offs │
│ Cost vs Latency vs Quality (Pareto) │
└──────────────┬──────────────────────────┘
┌─────────────────────────────────────────┐
│ 5. Monitor Production │
│ Track metrics, Detect regressions │
└─────────────────────────────────────────┘
```
## Part 1: Parallelization
### Async/Await for Concurrent Requests
**Problem:** Sequential API calls are slow (1 request/sec).
**Solution:** Concurrent requests with async/await (10-20 requests/sec).
```python
import asyncio
import openai
from typing import List
async def classify_async(text: str, semaphore: asyncio.Semaphore) -> str:
"""
Classify text asynchronously with rate limiting.
Args:
text: Text to classify
semaphore: Limits concurrent requests
Returns:
Classification result
"""
async with semaphore:
response = await openai.ChatCompletion.acreate(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "Classify sentiment: positive/negative/neutral"},
{"role": "user", "content": text}
]
)
return response.choices[0].message.content
async def classify_batch_parallel(
texts: List[str],
concurrency: int = 10
) -> List[str]:
"""
Classify multiple texts in parallel.
Args:
texts: List of texts to classify
concurrency: Maximum concurrent requests (default 10)
Returns:
List of classification results
"""
semaphore = asyncio.Semaphore(concurrency)
tasks = [classify_async(text, semaphore) for text in texts]
results = await asyncio.gather(*tasks)
return results
# Example usage
texts = ["Great product!", "Terrible service.", "It's okay."] * 333 # 1000 texts
# Sequential: 1000 requests × 1 second = 1000 seconds (16.7 minutes)
# Parallel (concurrency=10): 1000 requests / 10 = 100 seconds (1.7 minutes) - 10× FASTER!
results = asyncio.run(classify_batch_parallel(texts, concurrency=10))
print(f"Classified {len(results)} texts")
```
**Performance comparison:**
| Approach | Time | Throughput | Cost |
|----------|------|------------|------|
| Sequential | 1000s (16.7 min) | 1 req/sec | $2.00 |
| Parallel (10) | 100s (1.7 min) | 10 req/sec | $2.00 (same!) |
| Parallel (20) | 50s (0.8 min) | 20 req/sec | $2.00 (same!) |
**Key insight:** Parallelization is **free performance**. Same cost, 10-20× faster.
### OpenAI Batch API (Offline Processing)
**Problem:** Real-time API is expensive for large batch jobs.
**Solution:** Batch API (50% cheaper, 24-hour completion window).
```python
import openai
import jsonlines
import time
def create_batch_job(texts: List[str], output_file: str = "batch_results.jsonl"):
"""
Submit batch job for offline processing (50% cost reduction).
Args:
texts: List of texts to process
output_file: File to save results
Returns:
Batch job ID
"""
# Step 1: Create batch input file (JSONL format)
batch_input = []
for i, text in enumerate(texts):
batch_input.append({
"custom_id": f"request-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "system", "content": "Classify sentiment: positive/negative/neutral"},
{"role": "user", "content": text}
]
}
})
# Write to file
with jsonlines.open("batch_input.jsonl", "w") as writer:
writer.write_all(batch_input)
# Step 2: Upload file
with open("batch_input.jsonl", "rb") as f:
file_response = openai.File.create(file=f, purpose="batch")
# Step 3: Create batch job
batch_job = openai.Batch.create(
input_file_id=file_response.id,
endpoint="/v1/chat/completions",
completion_window="24h" # Complete within 24 hours
)
print(f"Batch job created: {batch_job.id}")
print(f"Status: {batch_job.status}")
return batch_job.id
def check_batch_status(batch_id: str):
"""Check batch job status."""
batch = openai.Batch.retrieve(batch_id)
print(f"Status: {batch.status}")
print(f"Completed: {batch.request_counts.completed}/{batch.request_counts.total}")
if batch.status == "completed":
# Download results
result_file_id = batch.output_file_id
result = openai.File.download(result_file_id)
with open("batch_results.jsonl", "wb") as f:
f.write(result)
print(f"Results saved to batch_results.jsonl")
return batch.status
# Example usage
texts = ["Great product!"] * 10000 # 10,000 texts
# Submit batch job
batch_id = create_batch_job(texts)
# Check status (poll every 10 minutes)
while True:
status = check_batch_status(batch_id)
if status == "completed":
break
time.sleep(600) # Check every 10 minutes
# Cost: $10 (batch API) vs $20 (real-time API) = 50% savings!
```
**When to use Batch API:**
| Use Case | Real-time API | Batch API |
|----------|--------------|-----------|
| User-facing chat | ✓ (latency critical) | ✗ |
| Document classification (10k docs) | ✗ (expensive) | ✓ (50% cheaper) |
| Nightly data processing | ✗ | ✓ |
| A/B test evaluation | ✗ | ✓ |
| Real-time search | ✓ | ✗ |
## Part 2: Caching
### Answer Caching (Repeated Queries)
**Problem:** 60-70% of queries are repeated (FAQs, common questions).
**Solution:** Cache answers for identical queries (60% cost reduction).
```python
import hashlib
import json
from typing import Optional
class AnswerCache:
def __init__(self):
self.cache = {} # In-memory cache (use Redis for production)
def _cache_key(self, query: str, model: str = "gpt-3.5-turbo") -> str:
"""Generate cache key from query and model."""
# Normalize query (lowercase, strip whitespace)
normalized = query.lower().strip()
# Hash for consistent key
key_data = f"{model}:{normalized}"
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]:
"""Get cached answer if exists."""
key = self._cache_key(query, model)
return self.cache.get(key)
def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"):
"""Cache answer for query."""
key = self._cache_key(query, model)
self.cache[key] = answer
def stats(self):
"""Get cache statistics."""
return {
"cache_size": len(self.cache),
"memory_bytes": sum(len(v.encode()) for v in self.cache.values())
}
def answer_with_cache(
query: str,
cache: AnswerCache,
model: str = "gpt-3.5-turbo"
) -> tuple[str, bool]:
"""
Answer query with caching.
Returns:
(answer, cache_hit)
"""
# Check cache
cached_answer = cache.get(query, model)
if cached_answer:
return cached_answer, True # Cache hit!
# Cache miss: Generate answer
response = openai.ChatCompletion.create(
model=model,
messages=[
{"role": "system", "content": "Answer the question concisely."},
{"role": "user", "content": query}
]
)
answer = response.choices[0].message.content
# Cache for future queries
cache.set(query, answer, model)
return answer, False
# Example usage
cache = AnswerCache()
queries = [
"What is your return policy?",
"How do I track my order?",
"What is your return policy?", # Repeated!
"Do you offer international shipping?",
"What is your return policy?", # Repeated again!
]
cache_hits = 0
cache_misses = 0
for query in queries:
answer, is_cache_hit = answer_with_cache(query, cache)
if is_cache_hit:
cache_hits += 1
print(f"[CACHE HIT] {query}")
else:
cache_misses += 1
print(f"[CACHE MISS] {query}")
print(f"Answer: {answer}\n")
print(f"Cache hits: {cache_hits}/{len(queries)} ({cache_hits/len(queries)*100:.1f}%)")
print(f"Cost savings: {cache_hits/len(queries)*100:.1f}%")
# Output:
# [CACHE MISS] What is your return policy?
# [CACHE MISS] How do I track my order?
# [CACHE HIT] What is your return policy?
# [CACHE MISS] Do you offer international shipping?
# [CACHE HIT] What is your return policy?
# Cache hits: 2/5 (40%)
# Cost savings: 40%
```
**Production caching with Redis:**
```python
import redis
import json
class RedisAnswerCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.ttl = 86400 # 24 hours
def _cache_key(self, query: str, model: str) -> str:
normalized = query.lower().strip()
return f"answer:{model}:{hashlib.md5(normalized.encode()).hexdigest()}"
def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]:
key = self._cache_key(query, model)
cached = self.redis_client.get(key)
return cached.decode() if cached else None
def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"):
key = self._cache_key(query, model)
self.redis_client.setex(key, self.ttl, answer)
def stats(self):
return {
"cache_size": self.redis_client.dbsize(),
"memory_usage": self.redis_client.info("memory")["used_memory_human"]
}
```
### Prompt Caching (Static Context)
**Problem:** RAG sends same context repeatedly (expensive).
**Solution:** Anthropic prompt caching (90% cost reduction for static context).
```python
import anthropic
def rag_with_prompt_caching(
query: str,
context: str, # Static context (knowledge base)
model: str = "claude-3-sonnet-20240229"
):
"""
RAG with prompt caching for static context.
First query: Full cost (e.g., $0.01)
Subsequent queries: 90% discount on cached context (e.g., $0.001)
"""
client = anthropic.Anthropic()
response = client.messages.create(
model=model,
max_tokens=500,
system=[
{
"type": "text",
"text": "Answer questions using only the provided context.",
},
{
"type": "text",
"text": f"Context:\n{context}",
"cache_control": {"type": "ephemeral"} # Cache this!
}
],
messages=[
{"role": "user", "content": query}
]
)
return response.content[0].text
# Example
knowledge_base = """
[Large knowledge base with 50,000 tokens of product info, policies, FAQs...]
"""
# Query 1: Full cost (write context to cache)
answer1 = rag_with_prompt_caching("What is your return policy?", knowledge_base)
# Cost: Input (50k tokens × $0.003/1k) + Cache write (50k × $0.00375/1k) = $0.34
# Query 2-100: 90% discount on cached context!
answer2 = rag_with_prompt_caching("How do I track my order?", knowledge_base)
# Cost: Cached input (50k × $0.0003/1k) + Query (20 tokens × $0.003/1k) = $0.015 + $0.00006 = $0.015
# Savings: Query 2-100 cost $0.015 vs $0.34 = 95.6% reduction per query!
```
**When prompt caching is effective:**
| Scenario | Static Context | Dynamic Content | Cache Savings |
|----------|----------------|-----------------|---------------|
| RAG with knowledge base | 50k tokens (policies, products) | Query (20 tokens) | 95%+ |
| Multi-turn chat with instructions | 1k tokens (system message) | Conversation (varying) | 60-80% |
| Document analysis | 10k tokens (document) | Multiple questions | 90%+ |
| Code review with context | 5k tokens (codebase) | Review comments | 85%+ |
## Part 3: Model Routing
### Task-Based Model Selection
**Problem:** Using GPT-4 for everything is 10× expensive.
**Solution:** Route by task complexity (GPT-3.5 for simple, GPT-4 for complex).
```python
from enum import Enum
from typing import Dict
class TaskType(Enum):
CLASSIFICATION = "classification"
EXTRACTION = "extraction"
SUMMARIZATION = "summarization"
TRANSLATION = "translation"
REASONING = "reasoning"
CREATIVE = "creative"
CODE_GENERATION = "code_generation"
class ModelRouter:
"""Route queries to appropriate model based on task complexity."""
# Model configurations
MODELS = {
"gpt-3.5-turbo": {
"cost_per_1k_input": 0.0015,
"cost_per_1k_output": 0.002,
"latency_factor": 1.0, # Baseline
"quality_score": 0.85
},
"gpt-4": {
"cost_per_1k_input": 0.03,
"cost_per_1k_output": 0.06,
"latency_factor": 2.5,
"quality_score": 0.95
},
"gpt-4-turbo": {
"cost_per_1k_input": 0.01,
"cost_per_1k_output": 0.03,
"latency_factor": 1.5,
"quality_score": 0.94
}
}
# Task → Model mapping
TASK_ROUTING = {
TaskType.CLASSIFICATION: "gpt-3.5-turbo", # Simple task
TaskType.EXTRACTION: "gpt-3.5-turbo",
TaskType.SUMMARIZATION: "gpt-3.5-turbo",
TaskType.TRANSLATION: "gpt-3.5-turbo",
TaskType.REASONING: "gpt-4", # Complex reasoning
TaskType.CREATIVE: "gpt-4", # Better creativity
TaskType.CODE_GENERATION: "gpt-4" # Better coding
}
@classmethod
def route(cls, task_type: TaskType, complexity: str = "medium") -> str:
"""
Route to appropriate model.
Args:
task_type: Type of task
complexity: "low", "medium", "high"
Returns:
Model name
"""
base_model = cls.TASK_ROUTING[task_type]
# Override for high complexity
if complexity == "high" and base_model == "gpt-3.5-turbo":
return "gpt-4-turbo" # Upgrade for complex variants
return base_model
@classmethod
def calculate_cost(cls, model: str, input_tokens: int, output_tokens: int) -> float:
"""Calculate cost for model."""
config = cls.MODELS[model]
input_cost = (input_tokens / 1000) * config["cost_per_1k_input"]
output_cost = (output_tokens / 1000) * config["cost_per_1k_output"]
return input_cost + output_cost
@classmethod
def compare_models(cls, task_type: TaskType, input_tokens: int = 500, output_tokens: int = 200):
"""Compare models for a task."""
print(f"\nTask: {task_type.value}")
print(f"Input: {input_tokens} tokens, Output: {output_tokens} tokens\n")
for model_name, config in cls.MODELS.items():
cost = cls.calculate_cost(model_name, input_tokens, output_tokens)
quality = config["quality_score"]
latency = config["latency_factor"]
print(f"{model_name}:")
print(f" Cost: ${cost:.4f}")
print(f" Quality: {quality:.0%}")
print(f" Latency: {latency:.1f}× baseline")
print(f" Cost per quality point: ${cost/quality:.4f}\n")
# Example usage
router = ModelRouter()
# Classification task
model = router.route(TaskType.CLASSIFICATION, complexity="low")
print(f"Classification → {model}") # gpt-3.5-turbo
# Complex reasoning task
model = router.route(TaskType.REASONING, complexity="high")
print(f"Complex reasoning → {model}") # gpt-4
# Compare costs
router.compare_models(TaskType.CLASSIFICATION, input_tokens=500, output_tokens=200)
# Output:
# gpt-3.5-turbo: $0.0015 (Cost per quality: $0.0018)
# gpt-4: $0.0270 (Cost per quality: $0.0284) - 18× more expensive!
# Recommendation: Use GPT-3.5 for classification (18× cheaper, acceptable quality)
```
### Model Cascade (Try Cheap First)
**Problem:** Don't know if task needs GPT-4 until you try.
**Solution:** Try GPT-3.5, escalate to GPT-4 if unsatisfied.
```python
def cascade_generation(
prompt: str,
quality_threshold: float = 0.8,
max_attempts: int = 2
) -> tuple[str, str, float]:
"""
Try cheaper model first, escalate if quality insufficient.
Args:
prompt: User prompt
quality_threshold: Minimum quality score (0-1)
max_attempts: Max escalation attempts
Returns:
(response, model_used, estimated_quality)
"""
models = ["gpt-3.5-turbo", "gpt-4-turbo", "gpt-4"]
for i, model in enumerate(models[:max_attempts]):
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
# Estimate quality (simplified - use LLM-as-judge in production)
quality = estimate_quality(result, prompt)
if quality >= quality_threshold:
print(f"{model} met quality threshold ({quality:.2f} >= {quality_threshold})")
return result, model, quality
else:
print(f"{model} below threshold ({quality:.2f} < {quality_threshold}), escalating...")
# Return best attempt even if below threshold
return result, models[max_attempts-1], quality
def estimate_quality(response: str, prompt: str) -> float:
"""
Estimate quality score (0-1).
Production: Use LLM-as-judge or other quality metrics.
"""
# Simplified heuristic
if len(response) < 20:
return 0.3 # Too short
elif len(response) > 500:
return 0.9 # Detailed
else:
return 0.7 # Moderate
# Example
prompt = "Explain quantum entanglement in simple terms."
result, model, quality = cascade_generation(prompt, quality_threshold=0.8)
print(f"\nFinal result:")
print(f"Model: {model}")
print(f"Quality: {quality:.2f}")
print(f"Response: {result[:200]}...")
# Average case: GPT-3.5 suffices (90% of queries)
# Cost: $0.002 per query
# Complex case: Escalate to GPT-4 (10% of queries)
# Cost: $0.002 (GPT-3.5 attempt) + $0.030 (GPT-4) = $0.032
# Overall cost: 0.9 × $0.002 + 0.1 × $0.032 = $0.0018 + $0.0032 = $0.005
# vs Always GPT-4: $0.030
# Savings: 83%!
```
## Part 4: Streaming
### Streaming for Long-Form Generation
**Problem:** 20-second wait for full article (40% bounce rate).
**Solution:** Stream tokens as generated (perceived latency: 0.5s).
```python
import openai
def generate_streaming(prompt: str, model: str = "gpt-4"):
"""
Generate response with streaming.
Benefits:
- First token in 0.5s (vs 20s wait)
- User sees progress (engagement)
- Can cancel early if needed
"""
response = openai.ChatCompletion.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=2000,
stream=True # Enable streaming
)
full_response = ""
for chunk in response:
if chunk.choices[0].delta.get("content"):
token = chunk.choices[0].delta.content
full_response += token
print(token, end="", flush=True) # Display immediately
print() # Newline
return full_response
# Example
prompt = "Write a detailed article about the history of artificial intelligence."
# Without streaming: Wait 20s, then see full article
# With streaming: See first words in 0.5s, smooth streaming for 20s
article = generate_streaming(prompt)
# User experience improvement:
# - Perceived latency: 20s → 0.5s (40× better!)
# - Bounce rate: 40% → 5% (35pp improvement!)
# - Satisfaction: 3.2/5 → 4.3/5 (+1.1 points!)
```
### Streaming in Web Applications
**Flask with Server-Sent Events (SSE):**
```python
from flask import Flask, Response, request
import openai
app = Flask(__name__)
@app.route('/generate', methods=['POST'])
def generate_stream():
"""Stream generation results to frontend."""
prompt = request.json.get('prompt')
def event_stream():
"""Generator for SSE."""
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.get("content"):
token = chunk.choices[0].delta.content
# SSE format: "data: {content}\n\n"
yield f"data: {token}\n\n"
# Signal completion
yield "data: [DONE]\n\n"
return Response(event_stream(), mimetype="text/event-stream")
# Frontend (JavaScript):
"""
const eventSource = new EventSource('/generate', {
method: 'POST',
body: JSON.stringify({prompt: userPrompt})
});
eventSource.onmessage = (event) => {
if (event.data === '[DONE]') {
eventSource.close();
} else {
// Append token to display
document.getElementById('output').innerText += event.data;
}
};
"""
```
## Part 5: Cost-Latency-Quality Trade-offs
### Multi-Objective Optimization
**Problem:** Optimizing single objective (cost OR latency) leads to poor trade-offs.
**Solution:** Pareto analysis to find balanced solutions.
```python
import numpy as np
from typing import List, Dict
class OptimizationOption:
def __init__(
self,
name: str,
latency_p95: float, # seconds
cost_per_1k: float, # dollars
quality_score: float # 0-1
):
self.name = name
self.latency_p95 = latency_p95
self.cost_per_1k = cost_per_1k
self.quality_score = quality_score
def dominates(self, other: 'OptimizationOption') -> bool:
"""Check if this option dominates another (Pareto dominance)."""
# Dominate if: better or equal in all dimensions, strictly better in at least one
better_latency = self.latency_p95 <= other.latency_p95
better_cost = self.cost_per_1k <= other.cost_per_1k
better_quality = self.quality_score >= other.quality_score
strictly_better = (
self.latency_p95 < other.latency_p95 or
self.cost_per_1k < other.cost_per_1k or
self.quality_score > other.quality_score
)
return better_latency and better_cost and better_quality and strictly_better
def __repr__(self):
return f"{self.name}: {self.latency_p95:.2f}s, ${self.cost_per_1k:.3f}/1k, {self.quality_score:.2f} quality"
def find_pareto_optimal(options: List[OptimizationOption]) -> List[OptimizationOption]:
"""Find Pareto optimal solutions (non-dominated options)."""
pareto_optimal = []
for option in options:
is_dominated = False
for other in options:
if other.dominates(option):
is_dominated = True
break
if not is_dominated:
pareto_optimal.append(option)
return pareto_optimal
# Example: RAG chatbot optimization
options = [
OptimizationOption("GPT-4, no caching", latency_p95=2.5, cost_per_1k=10.0, quality_score=0.92),
OptimizationOption("GPT-3.5, no caching", latency_p95=0.8, cost_per_1k=2.0, quality_score=0.78),
OptimizationOption("GPT-3.5 + caching", latency_p95=0.6, cost_per_1k=1.2, quality_score=0.78),
OptimizationOption("GPT-3.5 + caching + prompt eng", latency_p95=0.7, cost_per_1k=1.3, quality_score=0.85),
OptimizationOption("GPT-4 + caching", latency_p95=2.0, cost_per_1k=6.0, quality_score=0.92),
OptimizationOption("GPT-4-turbo + caching", latency_p95=1.2, cost_per_1k=4.0, quality_score=0.90),
]
# Find Pareto optimal
pareto = find_pareto_optimal(options)
print("Pareto Optimal Solutions:")
for opt in pareto:
print(f" {opt}")
# Output:
# Pareto Optimal Solutions:
# GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality
# GPT-4-turbo + caching: 1.20s, $4.000/1k, 0.90 quality
# GPT-4 + caching: 2.00s, $6.000/1k, 0.92 quality
# Interpretation:
# - If budget-conscious: GPT-3.5 + caching + prompt eng ($1.30/1k, 0.85 quality)
# - If quality-critical: GPT-4-turbo + caching ($4/1k, 0.90 quality, faster than GPT-4)
# - If maximum quality needed: GPT-4 + caching ($6/1k, 0.92 quality)
```
### Requirements-Based Selection
```python
def select_optimal_solution(
options: List[OptimizationOption],
max_latency: float = None,
max_cost: float = None,
min_quality: float = None
) -> OptimizationOption:
"""
Select optimal solution given constraints.
Args:
options: Available options
max_latency: Maximum acceptable latency (seconds)
max_cost: Maximum cost per 1k queries (dollars)
min_quality: Minimum quality score (0-1)
Returns:
Best option meeting all constraints
"""
# Filter options meeting constraints
feasible = []
for opt in options:
meets_latency = max_latency is None or opt.latency_p95 <= max_latency
meets_cost = max_cost is None or opt.cost_per_1k <= max_cost
meets_quality = min_quality is None or opt.quality_score >= min_quality
if meets_latency and meets_cost and meets_quality:
feasible.append(opt)
if not feasible:
raise ValueError("No solution meets all constraints")
# Among feasible, select best cost-quality trade-off
best = min(feasible, key=lambda opt: opt.cost_per_1k / opt.quality_score)
return best
# Example: Requirements
requirements = {
"max_latency": 1.0, # Must respond within 1 second
"max_cost": 5.0, # Budget: $5 per 1k queries
"min_quality": 0.85 # Minimum 85% quality
}
selected = select_optimal_solution(
options,
max_latency=requirements["max_latency"],
max_cost=requirements["max_cost"],
min_quality=requirements["min_quality"]
)
print(f"Selected solution: {selected}")
# Output: GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality
# (Meets all constraints, most cost-effective)
```
## Part 6: Production Monitoring
### Performance Metrics Tracking
```python
import time
from dataclasses import dataclass
from typing import List
import numpy as np
@dataclass
class QueryMetrics:
"""Metrics for a single query."""
latency_ms: float
input_tokens: int
output_tokens: int
cost: float
cache_hit: bool
model: str
class PerformanceMonitor:
"""Track and analyze performance metrics."""
def __init__(self):
self.metrics: List[QueryMetrics] = []
def log_query(
self,
latency_ms: float,
input_tokens: int,
output_tokens: int,
cost: float,
cache_hit: bool,
model: str
):
"""Log query metrics."""
self.metrics.append(QueryMetrics(
latency_ms=latency_ms,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
cache_hit=cache_hit,
model=model
))
def summary(self) -> Dict:
"""Generate summary statistics."""
if not self.metrics:
return {}
latencies = [m.latency_ms for m in self.metrics]
costs = [m.cost for m in self.metrics]
cache_hits = [m.cache_hit for m in self.metrics]
return {
"total_queries": len(self.metrics),
"latency_p50": np.percentile(latencies, 50),
"latency_p95": np.percentile(latencies, 95),
"latency_p99": np.percentile(latencies, 99),
"avg_cost": np.mean(costs),
"total_cost": np.sum(costs),
"cache_hit_rate": np.mean(cache_hits) * 100,
"queries_per_model": self._count_by_model()
}
def _count_by_model(self) -> Dict[str, int]:
"""Count queries by model."""
counts = {}
for m in self.metrics:
counts[m.model] = counts.get(m.model, 0) + 1
return counts
# Example usage
monitor = PerformanceMonitor()
# Simulate queries
for i in range(1000):
cache_hit = np.random.random() < 0.6 # 60% cache hit rate
latency = 100 if cache_hit else 800 # Cache: 100ms, API: 800ms
cost = 0 if cache_hit else 0.002
monitor.log_query(
latency_ms=latency,
input_tokens=500,
output_tokens=200,
cost=cost,
cache_hit=cache_hit,
model="gpt-3.5-turbo"
)
# Generate summary
summary = monitor.summary()
print("Performance Summary:")
print(f" Total queries: {summary['total_queries']}")
print(f" Latency P50: {summary['latency_p50']:.0f}ms")
print(f" Latency P95: {summary['latency_p95']:.0f}ms")
print(f" Avg cost: ${summary['avg_cost']:.4f}")
print(f" Total cost: ${summary['total_cost']:.2f}")
print(f" Cache hit rate: {summary['cache_hit_rate']:.1f}%")
```
## Summary
**Inference optimization is systematic, not ad-hoc.**
**Core techniques:**
1. **Parallelization:** Async/await (10× throughput), Batch API (50% cheaper)
2. **Caching:** Answer caching (60% savings), Prompt caching (90% savings)
3. **Model routing:** GPT-3.5 for simple tasks (10× cheaper), GPT-4 for complex
4. **Streaming:** First token in 0.5s (vs 20s wait), 35pp better completion rate
5. **Multi-objective:** Pareto analysis (balance cost-latency-quality)
**Checklist:**
1. ✓ Measure baseline (latency, cost, quality)
2. ✓ Set requirements (acceptable latency, budget, quality threshold)
3. ✓ Parallelize batch processing (10× throughput)
4. ✓ Implement caching (60-90% cost savings)
5. ✓ Route by task complexity (10× cost savings)
6. ✓ Stream long responses (better UX)
7. ✓ Analyze cost-latency-quality trade-offs (Pareto optimal)
8. ✓ Monitor production metrics (track improvements)
Production-ready performance requires deliberate optimization across multiple dimensions.