1033 lines
32 KiB
Markdown
1033 lines
32 KiB
Markdown
|
||
# LLM Inference Optimization Skill
|
||
|
||
## When to Use This Skill
|
||
|
||
Use this skill when:
|
||
- Building production LLM applications with latency requirements
|
||
- Processing large batches of requests (classification, summarization, extraction)
|
||
- Optimizing cost for high-volume applications
|
||
- Improving throughput for batch processing
|
||
- Enhancing user experience with streaming
|
||
- Balancing cost, latency, and quality trade-offs
|
||
|
||
**When NOT to use:** Prototyping or single-query experiments where optimization is premature.
|
||
|
||
## Core Principle
|
||
|
||
**Performance is not automatic. Optimization is systematic.**
|
||
|
||
Without optimization:
|
||
- Sequential processing: 16 minutes for 1000 documents (0.06 requests/sec)
|
||
- No caching: 60% wasted cost on repeated queries
|
||
- Wrong model: 10× expensive for same quality
|
||
- No streaming: 40% bounce rate on long generations
|
||
- Single-objective: Poor cost-latency-quality trade-offs
|
||
|
||
**Formula:** Parallelization (10× throughput) + Caching (60% cost savings) + Model routing (balanced cost-quality) + Streaming (better UX) + Multi-objective optimization (Pareto optimal) = Production-ready performance.
|
||
|
||
## Optimization Framework
|
||
|
||
```
|
||
┌─────────────────────────────────────────┐
|
||
│ 1. Measure Baseline │
|
||
│ Latency, Cost, Quality, Throughput │
|
||
└──────────────┬──────────────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────────────────────────┐
|
||
│ 2. Set Requirements │
|
||
│ Acceptable latency, Budget, Quality │
|
||
└──────────────┬──────────────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────────────────────────┐
|
||
│ 3. Apply Optimizations │
|
||
│ Parallelization → Caching → Routing │
|
||
└──────────────┬──────────────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────────────────────────┐
|
||
│ 4. Evaluate Trade-offs │
|
||
│ Cost vs Latency vs Quality (Pareto) │
|
||
└──────────────┬──────────────────────────┘
|
||
│
|
||
▼
|
||
┌─────────────────────────────────────────┐
|
||
│ 5. Monitor Production │
|
||
│ Track metrics, Detect regressions │
|
||
└─────────────────────────────────────────┘
|
||
```
|
||
|
||
## Part 1: Parallelization
|
||
|
||
### Async/Await for Concurrent Requests
|
||
|
||
**Problem:** Sequential API calls are slow (1 request/sec).
|
||
|
||
**Solution:** Concurrent requests with async/await (10-20 requests/sec).
|
||
|
||
```python
|
||
import asyncio
|
||
import openai
|
||
from typing import List
|
||
|
||
async def classify_async(text: str, semaphore: asyncio.Semaphore) -> str:
|
||
"""
|
||
Classify text asynchronously with rate limiting.
|
||
|
||
Args:
|
||
text: Text to classify
|
||
semaphore: Limits concurrent requests
|
||
|
||
Returns:
|
||
Classification result
|
||
"""
|
||
async with semaphore:
|
||
response = await openai.ChatCompletion.acreate(
|
||
model="gpt-3.5-turbo",
|
||
messages=[
|
||
{"role": "system", "content": "Classify sentiment: positive/negative/neutral"},
|
||
{"role": "user", "content": text}
|
||
]
|
||
)
|
||
return response.choices[0].message.content
|
||
|
||
async def classify_batch_parallel(
|
||
texts: List[str],
|
||
concurrency: int = 10
|
||
) -> List[str]:
|
||
"""
|
||
Classify multiple texts in parallel.
|
||
|
||
Args:
|
||
texts: List of texts to classify
|
||
concurrency: Maximum concurrent requests (default 10)
|
||
|
||
Returns:
|
||
List of classification results
|
||
"""
|
||
semaphore = asyncio.Semaphore(concurrency)
|
||
|
||
tasks = [classify_async(text, semaphore) for text in texts]
|
||
results = await asyncio.gather(*tasks)
|
||
|
||
return results
|
||
|
||
# Example usage
|
||
texts = ["Great product!", "Terrible service.", "It's okay."] * 333 # 1000 texts
|
||
|
||
# Sequential: 1000 requests × 1 second = 1000 seconds (16.7 minutes)
|
||
# Parallel (concurrency=10): 1000 requests / 10 = 100 seconds (1.7 minutes) - 10× FASTER!
|
||
|
||
results = asyncio.run(classify_batch_parallel(texts, concurrency=10))
|
||
print(f"Classified {len(results)} texts")
|
||
```
|
||
|
||
**Performance comparison:**
|
||
|
||
| Approach | Time | Throughput | Cost |
|
||
|----------|------|------------|------|
|
||
| Sequential | 1000s (16.7 min) | 1 req/sec | $2.00 |
|
||
| Parallel (10) | 100s (1.7 min) | 10 req/sec | $2.00 (same!) |
|
||
| Parallel (20) | 50s (0.8 min) | 20 req/sec | $2.00 (same!) |
|
||
|
||
**Key insight:** Parallelization is **free performance**. Same cost, 10-20× faster.
|
||
|
||
### OpenAI Batch API (Offline Processing)
|
||
|
||
**Problem:** Real-time API is expensive for large batch jobs.
|
||
|
||
**Solution:** Batch API (50% cheaper, 24-hour completion window).
|
||
|
||
```python
|
||
import openai
|
||
import jsonlines
|
||
import time
|
||
|
||
def create_batch_job(texts: List[str], output_file: str = "batch_results.jsonl"):
|
||
"""
|
||
Submit batch job for offline processing (50% cost reduction).
|
||
|
||
Args:
|
||
texts: List of texts to process
|
||
output_file: File to save results
|
||
|
||
Returns:
|
||
Batch job ID
|
||
"""
|
||
# Step 1: Create batch input file (JSONL format)
|
||
batch_input = []
|
||
for i, text in enumerate(texts):
|
||
batch_input.append({
|
||
"custom_id": f"request-{i}",
|
||
"method": "POST",
|
||
"url": "/v1/chat/completions",
|
||
"body": {
|
||
"model": "gpt-3.5-turbo",
|
||
"messages": [
|
||
{"role": "system", "content": "Classify sentiment: positive/negative/neutral"},
|
||
{"role": "user", "content": text}
|
||
]
|
||
}
|
||
})
|
||
|
||
# Write to file
|
||
with jsonlines.open("batch_input.jsonl", "w") as writer:
|
||
writer.write_all(batch_input)
|
||
|
||
# Step 2: Upload file
|
||
with open("batch_input.jsonl", "rb") as f:
|
||
file_response = openai.File.create(file=f, purpose="batch")
|
||
|
||
# Step 3: Create batch job
|
||
batch_job = openai.Batch.create(
|
||
input_file_id=file_response.id,
|
||
endpoint="/v1/chat/completions",
|
||
completion_window="24h" # Complete within 24 hours
|
||
)
|
||
|
||
print(f"Batch job created: {batch_job.id}")
|
||
print(f"Status: {batch_job.status}")
|
||
|
||
return batch_job.id
|
||
|
||
def check_batch_status(batch_id: str):
|
||
"""Check batch job status."""
|
||
batch = openai.Batch.retrieve(batch_id)
|
||
|
||
print(f"Status: {batch.status}")
|
||
print(f"Completed: {batch.request_counts.completed}/{batch.request_counts.total}")
|
||
|
||
if batch.status == "completed":
|
||
# Download results
|
||
result_file_id = batch.output_file_id
|
||
result = openai.File.download(result_file_id)
|
||
|
||
with open("batch_results.jsonl", "wb") as f:
|
||
f.write(result)
|
||
|
||
print(f"Results saved to batch_results.jsonl")
|
||
|
||
return batch.status
|
||
|
||
# Example usage
|
||
texts = ["Great product!"] * 10000 # 10,000 texts
|
||
|
||
# Submit batch job
|
||
batch_id = create_batch_job(texts)
|
||
|
||
# Check status (poll every 10 minutes)
|
||
while True:
|
||
status = check_batch_status(batch_id)
|
||
if status == "completed":
|
||
break
|
||
time.sleep(600) # Check every 10 minutes
|
||
|
||
# Cost: $10 (batch API) vs $20 (real-time API) = 50% savings!
|
||
```
|
||
|
||
**When to use Batch API:**
|
||
|
||
| Use Case | Real-time API | Batch API |
|
||
|----------|--------------|-----------|
|
||
| User-facing chat | ✓ (latency critical) | ✗ |
|
||
| Document classification (10k docs) | ✗ (expensive) | ✓ (50% cheaper) |
|
||
| Nightly data processing | ✗ | ✓ |
|
||
| A/B test evaluation | ✗ | ✓ |
|
||
| Real-time search | ✓ | ✗ |
|
||
|
||
|
||
## Part 2: Caching
|
||
|
||
### Answer Caching (Repeated Queries)
|
||
|
||
**Problem:** 60-70% of queries are repeated (FAQs, common questions).
|
||
|
||
**Solution:** Cache answers for identical queries (60% cost reduction).
|
||
|
||
```python
|
||
import hashlib
|
||
import json
|
||
from typing import Optional
|
||
|
||
class AnswerCache:
|
||
def __init__(self):
|
||
self.cache = {} # In-memory cache (use Redis for production)
|
||
|
||
def _cache_key(self, query: str, model: str = "gpt-3.5-turbo") -> str:
|
||
"""Generate cache key from query and model."""
|
||
# Normalize query (lowercase, strip whitespace)
|
||
normalized = query.lower().strip()
|
||
|
||
# Hash for consistent key
|
||
key_data = f"{model}:{normalized}"
|
||
return hashlib.md5(key_data.encode()).hexdigest()
|
||
|
||
def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]:
|
||
"""Get cached answer if exists."""
|
||
key = self._cache_key(query, model)
|
||
return self.cache.get(key)
|
||
|
||
def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"):
|
||
"""Cache answer for query."""
|
||
key = self._cache_key(query, model)
|
||
self.cache[key] = answer
|
||
|
||
def stats(self):
|
||
"""Get cache statistics."""
|
||
return {
|
||
"cache_size": len(self.cache),
|
||
"memory_bytes": sum(len(v.encode()) for v in self.cache.values())
|
||
}
|
||
|
||
def answer_with_cache(
|
||
query: str,
|
||
cache: AnswerCache,
|
||
model: str = "gpt-3.5-turbo"
|
||
) -> tuple[str, bool]:
|
||
"""
|
||
Answer query with caching.
|
||
|
||
Returns:
|
||
(answer, cache_hit)
|
||
"""
|
||
# Check cache
|
||
cached_answer = cache.get(query, model)
|
||
if cached_answer:
|
||
return cached_answer, True # Cache hit!
|
||
|
||
# Cache miss: Generate answer
|
||
response = openai.ChatCompletion.create(
|
||
model=model,
|
||
messages=[
|
||
{"role": "system", "content": "Answer the question concisely."},
|
||
{"role": "user", "content": query}
|
||
]
|
||
)
|
||
|
||
answer = response.choices[0].message.content
|
||
|
||
# Cache for future queries
|
||
cache.set(query, answer, model)
|
||
|
||
return answer, False
|
||
|
||
# Example usage
|
||
cache = AnswerCache()
|
||
|
||
queries = [
|
||
"What is your return policy?",
|
||
"How do I track my order?",
|
||
"What is your return policy?", # Repeated!
|
||
"Do you offer international shipping?",
|
||
"What is your return policy?", # Repeated again!
|
||
]
|
||
|
||
cache_hits = 0
|
||
cache_misses = 0
|
||
|
||
for query in queries:
|
||
answer, is_cache_hit = answer_with_cache(query, cache)
|
||
|
||
if is_cache_hit:
|
||
cache_hits += 1
|
||
print(f"[CACHE HIT] {query}")
|
||
else:
|
||
cache_misses += 1
|
||
print(f"[CACHE MISS] {query}")
|
||
|
||
print(f"Answer: {answer}\n")
|
||
|
||
print(f"Cache hits: {cache_hits}/{len(queries)} ({cache_hits/len(queries)*100:.1f}%)")
|
||
print(f"Cost savings: {cache_hits/len(queries)*100:.1f}%")
|
||
|
||
# Output:
|
||
# [CACHE MISS] What is your return policy?
|
||
# [CACHE MISS] How do I track my order?
|
||
# [CACHE HIT] What is your return policy?
|
||
# [CACHE MISS] Do you offer international shipping?
|
||
# [CACHE HIT] What is your return policy?
|
||
# Cache hits: 2/5 (40%)
|
||
# Cost savings: 40%
|
||
```
|
||
|
||
**Production caching with Redis:**
|
||
|
||
```python
|
||
import redis
|
||
import json
|
||
|
||
class RedisAnswerCache:
|
||
def __init__(self, redis_url: str = "redis://localhost:6379"):
|
||
self.redis_client = redis.from_url(redis_url)
|
||
self.ttl = 86400 # 24 hours
|
||
|
||
def _cache_key(self, query: str, model: str) -> str:
|
||
normalized = query.lower().strip()
|
||
return f"answer:{model}:{hashlib.md5(normalized.encode()).hexdigest()}"
|
||
|
||
def get(self, query: str, model: str = "gpt-3.5-turbo") -> Optional[str]:
|
||
key = self._cache_key(query, model)
|
||
cached = self.redis_client.get(key)
|
||
return cached.decode() if cached else None
|
||
|
||
def set(self, query: str, answer: str, model: str = "gpt-3.5-turbo"):
|
||
key = self._cache_key(query, model)
|
||
self.redis_client.setex(key, self.ttl, answer)
|
||
|
||
def stats(self):
|
||
return {
|
||
"cache_size": self.redis_client.dbsize(),
|
||
"memory_usage": self.redis_client.info("memory")["used_memory_human"]
|
||
}
|
||
```
|
||
|
||
### Prompt Caching (Static Context)
|
||
|
||
**Problem:** RAG sends same context repeatedly (expensive).
|
||
|
||
**Solution:** Anthropic prompt caching (90% cost reduction for static context).
|
||
|
||
```python
|
||
import anthropic
|
||
|
||
def rag_with_prompt_caching(
|
||
query: str,
|
||
context: str, # Static context (knowledge base)
|
||
model: str = "claude-3-sonnet-20240229"
|
||
):
|
||
"""
|
||
RAG with prompt caching for static context.
|
||
|
||
First query: Full cost (e.g., $0.01)
|
||
Subsequent queries: 90% discount on cached context (e.g., $0.001)
|
||
"""
|
||
client = anthropic.Anthropic()
|
||
|
||
response = client.messages.create(
|
||
model=model,
|
||
max_tokens=500,
|
||
system=[
|
||
{
|
||
"type": "text",
|
||
"text": "Answer questions using only the provided context.",
|
||
},
|
||
{
|
||
"type": "text",
|
||
"text": f"Context:\n{context}",
|
||
"cache_control": {"type": "ephemeral"} # Cache this!
|
||
}
|
||
],
|
||
messages=[
|
||
{"role": "user", "content": query}
|
||
]
|
||
)
|
||
|
||
return response.content[0].text
|
||
|
||
# Example
|
||
knowledge_base = """
|
||
[Large knowledge base with 50,000 tokens of product info, policies, FAQs...]
|
||
"""
|
||
|
||
# Query 1: Full cost (write context to cache)
|
||
answer1 = rag_with_prompt_caching("What is your return policy?", knowledge_base)
|
||
# Cost: Input (50k tokens × $0.003/1k) + Cache write (50k × $0.00375/1k) = $0.34
|
||
|
||
# Query 2-100: 90% discount on cached context!
|
||
answer2 = rag_with_prompt_caching("How do I track my order?", knowledge_base)
|
||
# Cost: Cached input (50k × $0.0003/1k) + Query (20 tokens × $0.003/1k) = $0.015 + $0.00006 = $0.015
|
||
|
||
# Savings: Query 2-100 cost $0.015 vs $0.34 = 95.6% reduction per query!
|
||
```
|
||
|
||
**When prompt caching is effective:**
|
||
|
||
| Scenario | Static Context | Dynamic Content | Cache Savings |
|
||
|----------|----------------|-----------------|---------------|
|
||
| RAG with knowledge base | 50k tokens (policies, products) | Query (20 tokens) | 95%+ |
|
||
| Multi-turn chat with instructions | 1k tokens (system message) | Conversation (varying) | 60-80% |
|
||
| Document analysis | 10k tokens (document) | Multiple questions | 90%+ |
|
||
| Code review with context | 5k tokens (codebase) | Review comments | 85%+ |
|
||
|
||
|
||
## Part 3: Model Routing
|
||
|
||
### Task-Based Model Selection
|
||
|
||
**Problem:** Using GPT-4 for everything is 10× expensive.
|
||
|
||
**Solution:** Route by task complexity (GPT-3.5 for simple, GPT-4 for complex).
|
||
|
||
```python
|
||
from enum import Enum
|
||
from typing import Dict
|
||
|
||
class TaskType(Enum):
|
||
CLASSIFICATION = "classification"
|
||
EXTRACTION = "extraction"
|
||
SUMMARIZATION = "summarization"
|
||
TRANSLATION = "translation"
|
||
REASONING = "reasoning"
|
||
CREATIVE = "creative"
|
||
CODE_GENERATION = "code_generation"
|
||
|
||
class ModelRouter:
|
||
"""Route queries to appropriate model based on task complexity."""
|
||
|
||
# Model configurations
|
||
MODELS = {
|
||
"gpt-3.5-turbo": {
|
||
"cost_per_1k_input": 0.0015,
|
||
"cost_per_1k_output": 0.002,
|
||
"latency_factor": 1.0, # Baseline
|
||
"quality_score": 0.85
|
||
},
|
||
"gpt-4": {
|
||
"cost_per_1k_input": 0.03,
|
||
"cost_per_1k_output": 0.06,
|
||
"latency_factor": 2.5,
|
||
"quality_score": 0.95
|
||
},
|
||
"gpt-4-turbo": {
|
||
"cost_per_1k_input": 0.01,
|
||
"cost_per_1k_output": 0.03,
|
||
"latency_factor": 1.5,
|
||
"quality_score": 0.94
|
||
}
|
||
}
|
||
|
||
# Task → Model mapping
|
||
TASK_ROUTING = {
|
||
TaskType.CLASSIFICATION: "gpt-3.5-turbo", # Simple task
|
||
TaskType.EXTRACTION: "gpt-3.5-turbo",
|
||
TaskType.SUMMARIZATION: "gpt-3.5-turbo",
|
||
TaskType.TRANSLATION: "gpt-3.5-turbo",
|
||
TaskType.REASONING: "gpt-4", # Complex reasoning
|
||
TaskType.CREATIVE: "gpt-4", # Better creativity
|
||
TaskType.CODE_GENERATION: "gpt-4" # Better coding
|
||
}
|
||
|
||
@classmethod
|
||
def route(cls, task_type: TaskType, complexity: str = "medium") -> str:
|
||
"""
|
||
Route to appropriate model.
|
||
|
||
Args:
|
||
task_type: Type of task
|
||
complexity: "low", "medium", "high"
|
||
|
||
Returns:
|
||
Model name
|
||
"""
|
||
base_model = cls.TASK_ROUTING[task_type]
|
||
|
||
# Override for high complexity
|
||
if complexity == "high" and base_model == "gpt-3.5-turbo":
|
||
return "gpt-4-turbo" # Upgrade for complex variants
|
||
|
||
return base_model
|
||
|
||
@classmethod
|
||
def calculate_cost(cls, model: str, input_tokens: int, output_tokens: int) -> float:
|
||
"""Calculate cost for model."""
|
||
config = cls.MODELS[model]
|
||
input_cost = (input_tokens / 1000) * config["cost_per_1k_input"]
|
||
output_cost = (output_tokens / 1000) * config["cost_per_1k_output"]
|
||
return input_cost + output_cost
|
||
|
||
@classmethod
|
||
def compare_models(cls, task_type: TaskType, input_tokens: int = 500, output_tokens: int = 200):
|
||
"""Compare models for a task."""
|
||
print(f"\nTask: {task_type.value}")
|
||
print(f"Input: {input_tokens} tokens, Output: {output_tokens} tokens\n")
|
||
|
||
for model_name, config in cls.MODELS.items():
|
||
cost = cls.calculate_cost(model_name, input_tokens, output_tokens)
|
||
quality = config["quality_score"]
|
||
latency = config["latency_factor"]
|
||
|
||
print(f"{model_name}:")
|
||
print(f" Cost: ${cost:.4f}")
|
||
print(f" Quality: {quality:.0%}")
|
||
print(f" Latency: {latency:.1f}× baseline")
|
||
print(f" Cost per quality point: ${cost/quality:.4f}\n")
|
||
|
||
# Example usage
|
||
router = ModelRouter()
|
||
|
||
# Classification task
|
||
model = router.route(TaskType.CLASSIFICATION, complexity="low")
|
||
print(f"Classification → {model}") # gpt-3.5-turbo
|
||
|
||
# Complex reasoning task
|
||
model = router.route(TaskType.REASONING, complexity="high")
|
||
print(f"Complex reasoning → {model}") # gpt-4
|
||
|
||
# Compare costs
|
||
router.compare_models(TaskType.CLASSIFICATION, input_tokens=500, output_tokens=200)
|
||
# Output:
|
||
# gpt-3.5-turbo: $0.0015 (Cost per quality: $0.0018)
|
||
# gpt-4: $0.0270 (Cost per quality: $0.0284) - 18× more expensive!
|
||
# Recommendation: Use GPT-3.5 for classification (18× cheaper, acceptable quality)
|
||
```
|
||
|
||
### Model Cascade (Try Cheap First)
|
||
|
||
**Problem:** Don't know if task needs GPT-4 until you try.
|
||
|
||
**Solution:** Try GPT-3.5, escalate to GPT-4 if unsatisfied.
|
||
|
||
```python
|
||
def cascade_generation(
|
||
prompt: str,
|
||
quality_threshold: float = 0.8,
|
||
max_attempts: int = 2
|
||
) -> tuple[str, str, float]:
|
||
"""
|
||
Try cheaper model first, escalate if quality insufficient.
|
||
|
||
Args:
|
||
prompt: User prompt
|
||
quality_threshold: Minimum quality score (0-1)
|
||
max_attempts: Max escalation attempts
|
||
|
||
Returns:
|
||
(response, model_used, estimated_quality)
|
||
"""
|
||
models = ["gpt-3.5-turbo", "gpt-4-turbo", "gpt-4"]
|
||
|
||
for i, model in enumerate(models[:max_attempts]):
|
||
response = openai.ChatCompletion.create(
|
||
model=model,
|
||
messages=[{"role": "user", "content": prompt}]
|
||
)
|
||
|
||
result = response.choices[0].message.content
|
||
|
||
# Estimate quality (simplified - use LLM-as-judge in production)
|
||
quality = estimate_quality(result, prompt)
|
||
|
||
if quality >= quality_threshold:
|
||
print(f"✓ {model} met quality threshold ({quality:.2f} >= {quality_threshold})")
|
||
return result, model, quality
|
||
else:
|
||
print(f"✗ {model} below threshold ({quality:.2f} < {quality_threshold}), escalating...")
|
||
|
||
# Return best attempt even if below threshold
|
||
return result, models[max_attempts-1], quality
|
||
|
||
def estimate_quality(response: str, prompt: str) -> float:
|
||
"""
|
||
Estimate quality score (0-1).
|
||
|
||
Production: Use LLM-as-judge or other quality metrics.
|
||
"""
|
||
# Simplified heuristic
|
||
if len(response) < 20:
|
||
return 0.3 # Too short
|
||
elif len(response) > 500:
|
||
return 0.9 # Detailed
|
||
else:
|
||
return 0.7 # Moderate
|
||
|
||
# Example
|
||
prompt = "Explain quantum entanglement in simple terms."
|
||
|
||
result, model, quality = cascade_generation(prompt, quality_threshold=0.8)
|
||
|
||
print(f"\nFinal result:")
|
||
print(f"Model: {model}")
|
||
print(f"Quality: {quality:.2f}")
|
||
print(f"Response: {result[:200]}...")
|
||
|
||
# Average case: GPT-3.5 suffices (90% of queries)
|
||
# Cost: $0.002 per query
|
||
|
||
# Complex case: Escalate to GPT-4 (10% of queries)
|
||
# Cost: $0.002 (GPT-3.5 attempt) + $0.030 (GPT-4) = $0.032
|
||
|
||
# Overall cost: 0.9 × $0.002 + 0.1 × $0.032 = $0.0018 + $0.0032 = $0.005
|
||
# vs Always GPT-4: $0.030
|
||
# Savings: 83%!
|
||
```
|
||
|
||
|
||
## Part 4: Streaming
|
||
|
||
### Streaming for Long-Form Generation
|
||
|
||
**Problem:** 20-second wait for full article (40% bounce rate).
|
||
|
||
**Solution:** Stream tokens as generated (perceived latency: 0.5s).
|
||
|
||
```python
|
||
import openai
|
||
|
||
def generate_streaming(prompt: str, model: str = "gpt-4"):
|
||
"""
|
||
Generate response with streaming.
|
||
|
||
Benefits:
|
||
- First token in 0.5s (vs 20s wait)
|
||
- User sees progress (engagement)
|
||
- Can cancel early if needed
|
||
"""
|
||
response = openai.ChatCompletion.create(
|
||
model=model,
|
||
messages=[{"role": "user", "content": prompt}],
|
||
max_tokens=2000,
|
||
stream=True # Enable streaming
|
||
)
|
||
|
||
full_response = ""
|
||
|
||
for chunk in response:
|
||
if chunk.choices[0].delta.get("content"):
|
||
token = chunk.choices[0].delta.content
|
||
full_response += token
|
||
print(token, end="", flush=True) # Display immediately
|
||
|
||
print() # Newline
|
||
return full_response
|
||
|
||
# Example
|
||
prompt = "Write a detailed article about the history of artificial intelligence."
|
||
|
||
# Without streaming: Wait 20s, then see full article
|
||
# With streaming: See first words in 0.5s, smooth streaming for 20s
|
||
article = generate_streaming(prompt)
|
||
|
||
# User experience improvement:
|
||
# - Perceived latency: 20s → 0.5s (40× better!)
|
||
# - Bounce rate: 40% → 5% (35pp improvement!)
|
||
# - Satisfaction: 3.2/5 → 4.3/5 (+1.1 points!)
|
||
```
|
||
|
||
### Streaming in Web Applications
|
||
|
||
**Flask with Server-Sent Events (SSE):**
|
||
|
||
```python
|
||
from flask import Flask, Response, request
|
||
import openai
|
||
|
||
app = Flask(__name__)
|
||
|
||
@app.route('/generate', methods=['POST'])
|
||
def generate_stream():
|
||
"""Stream generation results to frontend."""
|
||
prompt = request.json.get('prompt')
|
||
|
||
def event_stream():
|
||
"""Generator for SSE."""
|
||
response = openai.ChatCompletion.create(
|
||
model="gpt-4",
|
||
messages=[{"role": "user", "content": prompt}],
|
||
stream=True
|
||
)
|
||
|
||
for chunk in response:
|
||
if chunk.choices[0].delta.get("content"):
|
||
token = chunk.choices[0].delta.content
|
||
# SSE format: "data: {content}\n\n"
|
||
yield f"data: {token}\n\n"
|
||
|
||
# Signal completion
|
||
yield "data: [DONE]\n\n"
|
||
|
||
return Response(event_stream(), mimetype="text/event-stream")
|
||
|
||
# Frontend (JavaScript):
|
||
"""
|
||
const eventSource = new EventSource('/generate', {
|
||
method: 'POST',
|
||
body: JSON.stringify({prompt: userPrompt})
|
||
});
|
||
|
||
eventSource.onmessage = (event) => {
|
||
if (event.data === '[DONE]') {
|
||
eventSource.close();
|
||
} else {
|
||
// Append token to display
|
||
document.getElementById('output').innerText += event.data;
|
||
}
|
||
};
|
||
"""
|
||
```
|
||
|
||
|
||
## Part 5: Cost-Latency-Quality Trade-offs
|
||
|
||
### Multi-Objective Optimization
|
||
|
||
**Problem:** Optimizing single objective (cost OR latency) leads to poor trade-offs.
|
||
|
||
**Solution:** Pareto analysis to find balanced solutions.
|
||
|
||
```python
|
||
import numpy as np
|
||
from typing import List, Dict
|
||
|
||
class OptimizationOption:
|
||
def __init__(
|
||
self,
|
||
name: str,
|
||
latency_p95: float, # seconds
|
||
cost_per_1k: float, # dollars
|
||
quality_score: float # 0-1
|
||
):
|
||
self.name = name
|
||
self.latency_p95 = latency_p95
|
||
self.cost_per_1k = cost_per_1k
|
||
self.quality_score = quality_score
|
||
|
||
def dominates(self, other: 'OptimizationOption') -> bool:
|
||
"""Check if this option dominates another (Pareto dominance)."""
|
||
# Dominate if: better or equal in all dimensions, strictly better in at least one
|
||
better_latency = self.latency_p95 <= other.latency_p95
|
||
better_cost = self.cost_per_1k <= other.cost_per_1k
|
||
better_quality = self.quality_score >= other.quality_score
|
||
|
||
strictly_better = (
|
||
self.latency_p95 < other.latency_p95 or
|
||
self.cost_per_1k < other.cost_per_1k or
|
||
self.quality_score > other.quality_score
|
||
)
|
||
|
||
return better_latency and better_cost and better_quality and strictly_better
|
||
|
||
def __repr__(self):
|
||
return f"{self.name}: {self.latency_p95:.2f}s, ${self.cost_per_1k:.3f}/1k, {self.quality_score:.2f} quality"
|
||
|
||
def find_pareto_optimal(options: List[OptimizationOption]) -> List[OptimizationOption]:
|
||
"""Find Pareto optimal solutions (non-dominated options)."""
|
||
pareto_optimal = []
|
||
|
||
for option in options:
|
||
is_dominated = False
|
||
for other in options:
|
||
if other.dominates(option):
|
||
is_dominated = True
|
||
break
|
||
|
||
if not is_dominated:
|
||
pareto_optimal.append(option)
|
||
|
||
return pareto_optimal
|
||
|
||
# Example: RAG chatbot optimization
|
||
options = [
|
||
OptimizationOption("GPT-4, no caching", latency_p95=2.5, cost_per_1k=10.0, quality_score=0.92),
|
||
OptimizationOption("GPT-3.5, no caching", latency_p95=0.8, cost_per_1k=2.0, quality_score=0.78),
|
||
OptimizationOption("GPT-3.5 + caching", latency_p95=0.6, cost_per_1k=1.2, quality_score=0.78),
|
||
OptimizationOption("GPT-3.5 + caching + prompt eng", latency_p95=0.7, cost_per_1k=1.3, quality_score=0.85),
|
||
OptimizationOption("GPT-4 + caching", latency_p95=2.0, cost_per_1k=6.0, quality_score=0.92),
|
||
OptimizationOption("GPT-4-turbo + caching", latency_p95=1.2, cost_per_1k=4.0, quality_score=0.90),
|
||
]
|
||
|
||
# Find Pareto optimal
|
||
pareto = find_pareto_optimal(options)
|
||
|
||
print("Pareto Optimal Solutions:")
|
||
for opt in pareto:
|
||
print(f" {opt}")
|
||
|
||
# Output:
|
||
# Pareto Optimal Solutions:
|
||
# GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality
|
||
# GPT-4-turbo + caching: 1.20s, $4.000/1k, 0.90 quality
|
||
# GPT-4 + caching: 2.00s, $6.000/1k, 0.92 quality
|
||
|
||
# Interpretation:
|
||
# - If budget-conscious: GPT-3.5 + caching + prompt eng ($1.30/1k, 0.85 quality)
|
||
# - If quality-critical: GPT-4-turbo + caching ($4/1k, 0.90 quality, faster than GPT-4)
|
||
# - If maximum quality needed: GPT-4 + caching ($6/1k, 0.92 quality)
|
||
```
|
||
|
||
### Requirements-Based Selection
|
||
|
||
```python
|
||
def select_optimal_solution(
|
||
options: List[OptimizationOption],
|
||
max_latency: float = None,
|
||
max_cost: float = None,
|
||
min_quality: float = None
|
||
) -> OptimizationOption:
|
||
"""
|
||
Select optimal solution given constraints.
|
||
|
||
Args:
|
||
options: Available options
|
||
max_latency: Maximum acceptable latency (seconds)
|
||
max_cost: Maximum cost per 1k queries (dollars)
|
||
min_quality: Minimum quality score (0-1)
|
||
|
||
Returns:
|
||
Best option meeting all constraints
|
||
"""
|
||
# Filter options meeting constraints
|
||
feasible = []
|
||
for opt in options:
|
||
meets_latency = max_latency is None or opt.latency_p95 <= max_latency
|
||
meets_cost = max_cost is None or opt.cost_per_1k <= max_cost
|
||
meets_quality = min_quality is None or opt.quality_score >= min_quality
|
||
|
||
if meets_latency and meets_cost and meets_quality:
|
||
feasible.append(opt)
|
||
|
||
if not feasible:
|
||
raise ValueError("No solution meets all constraints")
|
||
|
||
# Among feasible, select best cost-quality trade-off
|
||
best = min(feasible, key=lambda opt: opt.cost_per_1k / opt.quality_score)
|
||
|
||
return best
|
||
|
||
# Example: Requirements
|
||
requirements = {
|
||
"max_latency": 1.0, # Must respond within 1 second
|
||
"max_cost": 5.0, # Budget: $5 per 1k queries
|
||
"min_quality": 0.85 # Minimum 85% quality
|
||
}
|
||
|
||
selected = select_optimal_solution(
|
||
options,
|
||
max_latency=requirements["max_latency"],
|
||
max_cost=requirements["max_cost"],
|
||
min_quality=requirements["min_quality"]
|
||
)
|
||
|
||
print(f"Selected solution: {selected}")
|
||
# Output: GPT-3.5 + caching + prompt eng: 0.70s, $1.300/1k, 0.85 quality
|
||
# (Meets all constraints, most cost-effective)
|
||
```
|
||
|
||
|
||
## Part 6: Production Monitoring
|
||
|
||
### Performance Metrics Tracking
|
||
|
||
```python
|
||
import time
|
||
from dataclasses import dataclass
|
||
from typing import List
|
||
import numpy as np
|
||
|
||
@dataclass
|
||
class QueryMetrics:
|
||
"""Metrics for a single query."""
|
||
latency_ms: float
|
||
input_tokens: int
|
||
output_tokens: int
|
||
cost: float
|
||
cache_hit: bool
|
||
model: str
|
||
|
||
class PerformanceMonitor:
|
||
"""Track and analyze performance metrics."""
|
||
|
||
def __init__(self):
|
||
self.metrics: List[QueryMetrics] = []
|
||
|
||
def log_query(
|
||
self,
|
||
latency_ms: float,
|
||
input_tokens: int,
|
||
output_tokens: int,
|
||
cost: float,
|
||
cache_hit: bool,
|
||
model: str
|
||
):
|
||
"""Log query metrics."""
|
||
self.metrics.append(QueryMetrics(
|
||
latency_ms=latency_ms,
|
||
input_tokens=input_tokens,
|
||
output_tokens=output_tokens,
|
||
cost=cost,
|
||
cache_hit=cache_hit,
|
||
model=model
|
||
))
|
||
|
||
def summary(self) -> Dict:
|
||
"""Generate summary statistics."""
|
||
if not self.metrics:
|
||
return {}
|
||
|
||
latencies = [m.latency_ms for m in self.metrics]
|
||
costs = [m.cost for m in self.metrics]
|
||
cache_hits = [m.cache_hit for m in self.metrics]
|
||
|
||
return {
|
||
"total_queries": len(self.metrics),
|
||
"latency_p50": np.percentile(latencies, 50),
|
||
"latency_p95": np.percentile(latencies, 95),
|
||
"latency_p99": np.percentile(latencies, 99),
|
||
"avg_cost": np.mean(costs),
|
||
"total_cost": np.sum(costs),
|
||
"cache_hit_rate": np.mean(cache_hits) * 100,
|
||
"queries_per_model": self._count_by_model()
|
||
}
|
||
|
||
def _count_by_model(self) -> Dict[str, int]:
|
||
"""Count queries by model."""
|
||
counts = {}
|
||
for m in self.metrics:
|
||
counts[m.model] = counts.get(m.model, 0) + 1
|
||
return counts
|
||
|
||
# Example usage
|
||
monitor = PerformanceMonitor()
|
||
|
||
# Simulate queries
|
||
for i in range(1000):
|
||
cache_hit = np.random.random() < 0.6 # 60% cache hit rate
|
||
latency = 100 if cache_hit else 800 # Cache: 100ms, API: 800ms
|
||
cost = 0 if cache_hit else 0.002
|
||
|
||
monitor.log_query(
|
||
latency_ms=latency,
|
||
input_tokens=500,
|
||
output_tokens=200,
|
||
cost=cost,
|
||
cache_hit=cache_hit,
|
||
model="gpt-3.5-turbo"
|
||
)
|
||
|
||
# Generate summary
|
||
summary = monitor.summary()
|
||
|
||
print("Performance Summary:")
|
||
print(f" Total queries: {summary['total_queries']}")
|
||
print(f" Latency P50: {summary['latency_p50']:.0f}ms")
|
||
print(f" Latency P95: {summary['latency_p95']:.0f}ms")
|
||
print(f" Avg cost: ${summary['avg_cost']:.4f}")
|
||
print(f" Total cost: ${summary['total_cost']:.2f}")
|
||
print(f" Cache hit rate: {summary['cache_hit_rate']:.1f}%")
|
||
```
|
||
|
||
|
||
## Summary
|
||
|
||
**Inference optimization is systematic, not ad-hoc.**
|
||
|
||
**Core techniques:**
|
||
1. **Parallelization:** Async/await (10× throughput), Batch API (50% cheaper)
|
||
2. **Caching:** Answer caching (60% savings), Prompt caching (90% savings)
|
||
3. **Model routing:** GPT-3.5 for simple tasks (10× cheaper), GPT-4 for complex
|
||
4. **Streaming:** First token in 0.5s (vs 20s wait), 35pp better completion rate
|
||
5. **Multi-objective:** Pareto analysis (balance cost-latency-quality)
|
||
|
||
**Checklist:**
|
||
1. ✓ Measure baseline (latency, cost, quality)
|
||
2. ✓ Set requirements (acceptable latency, budget, quality threshold)
|
||
3. ✓ Parallelize batch processing (10× throughput)
|
||
4. ✓ Implement caching (60-90% cost savings)
|
||
5. ✓ Route by task complexity (10× cost savings)
|
||
6. ✓ Stream long responses (better UX)
|
||
7. ✓ Analyze cost-latency-quality trade-offs (Pareto optimal)
|
||
8. ✓ Monitor production metrics (track improvements)
|
||
|
||
Production-ready performance requires deliberate optimization across multiple dimensions.
|