--- name: anomaly-detector description: | Anomaly and outlier detection using Isolation Forest, One-Class SVM, autoencoders, and statistical methods. Activates for "anomaly detection", "outlier detection", "fraud detection", "intrusion detection", "abnormal behavior", "unusual patterns", "detect anomalies", "system monitoring". Handles supervised and unsupervised anomaly detection with SpecWeave increment integration. --- # Anomaly Detector ## Overview Detect unusual patterns, outliers, and anomalies in data using statistical methods, machine learning, and deep learning. Critical for fraud detection, security monitoring, quality control, and system health monitoring—all integrated with SpecWeave's increment workflow. ## Why Anomaly Detection is Different **Challenge**: Anomalies are rare (0.1% - 5% of data) **Standard classification doesn't work**: - ❌ Extreme class imbalance - ❌ Unknown anomaly patterns - ❌ Expensive to label anomalies - ❌ Anomalies evolve over time **Anomaly detection approaches**: - ✅ Unsupervised (no labels needed) - ✅ Semi-supervised (learn from normal data) - ✅ Statistical (deviation from expected) - ✅ Context-aware (what's normal for this user/time/location?) ## Anomaly Detection Methods ### 1. Statistical Methods (Baseline) **Z-Score / Standard Deviation**: ```python from specweave import AnomalyDetector detector = AnomalyDetector( method="statistical", increment="0042" ) # Flag values > 3 standard deviations from mean anomalies = detector.detect( data=transaction_amounts, threshold=3.0 ) # Simple, fast, but assumes normal distribution ``` **IQR (Interquartile Range)**: ```python # More robust to non-normal distributions detector = AnomalyDetector(method="iqr") # Flag values outside [Q1 - 1.5*IQR, Q3 + 1.5*IQR] anomalies = detector.detect(data=response_times) # Good for skewed distributions ``` ### 2. Isolation Forest (Recommended) **Best for**: General purpose, high-dimensional data ```python from specweave import IsolationForestDetector detector = IsolationForestDetector( contamination=0.05, # Expected anomaly rate (5%) increment="0042" ) # Train on normal data (or mixed data) detector.fit(X_train) # Detect anomalies predictions = detector.predict(X_test) # -1 = anomaly, 1 = normal anomaly_scores = detector.score(X_test) # Lower score = more anomalous # Generates: # - Anomaly scores for all samples # - Feature importance (which features contribute to anomaly) # - Threshold visualization # - Top anomalies ranked by score ``` **Why Isolation Forest works**: - Fast (O(n log n)) - Handles high dimensions well - No assumptions about data distribution - Anomalies are easier to isolate (fewer splits) ### 3. One-Class SVM **Best for**: When you have only normal data for training ```python from specweave import OneClassSVMDetector # Train only on normal transactions detector = OneClassSVMDetector( kernel='rbf', nu=0.05, # Expected anomaly rate increment="0042" ) detector.fit(X_normal) # Detect anomalies in new data predictions = detector.predict(X_new) # -1 = anomaly, 1 = normal # Good for: Clean training data of normal samples ``` ### 4. Autoencoders (Deep Learning) **Best for**: Complex patterns, high-dimensional data, images ```python from specweave import AutoencoderDetector # Learn to reconstruct normal data detector = AutoencoderDetector( encoding_dim=32, # Compressed representation layers=[64, 32, 16, 32, 64], increment="0042" ) # Train on normal data detector.fit( X_normal, epochs=100, validation_split=0.2 ) # Anomalies have high reconstruction error anomaly_scores = detector.score(X_test) # Generates: # - Reconstruction error distribution # - Threshold recommendation # - Top anomalies with explanations # - Learned representations (t-SNE plot) ``` **How autoencoders work**: ``` Input → Encoder → Compressed → Decoder → Reconstructed Normal data: Low reconstruction error (learned well) Anomalies: High reconstruction error (never seen before) ``` ### 5. LOF (Local Outlier Factor) **Best for**: Density-based anomalies (sparse regions) ```python from specweave import LOFDetector # Detects points in low-density regions detector = LOFDetector( n_neighbors=20, contamination=0.05, increment="0042" ) detector.fit(X_train) predictions = detector.predict(X_test) # Good for: Clustered data with sparse anomalies ``` ## Anomaly Detection Workflows ### Workflow 1: Fraud Detection ```python from specweave import FraudDetectionPipeline pipeline = FraudDetectionPipeline(increment="0042") # Features: transaction amount, location, time, merchant, etc. pipeline.fit(normal_transactions) # Real-time fraud detection fraud_scores = pipeline.predict_proba(new_transactions) # For each transaction: # - Fraud probability (0-1) # - Anomaly score # - Contributing features # - Similar past cases # Generates: # - Precision-Recall curve (fraud is rare) # - Cost-benefit analysis (false positives vs missed fraud) # - Feature importance for fraud # - Fraud patterns identified ``` **Fraud Detection Best Practices**: ```python # 1. Use multiple signals pipeline.add_signals([ 'amount_vs_user_average', 'distance_from_home', 'merchant_risk_score', 'velocity_24h' # Transactions in last 24h ]) # 2. Set threshold based on cost # False Positive cost: $5 (manual review) # False Negative cost: $500 (fraud loss) # Optimal threshold: Maximize (savings - review_cost) # 3. Provide explanations explanation = pipeline.explain_prediction(suspicious_transaction) # "Flagged because: amount 10x user average, new merchant, foreign location" ``` ### Workflow 2: System Anomaly Detection ```python from specweave import SystemAnomalyPipeline # Monitor system metrics (CPU, memory, latency, errors) pipeline = SystemAnomalyPipeline(increment="0042") # Train on normal system behavior pipeline.fit(normal_metrics) # Detect system anomalies anomalies = pipeline.detect(current_metrics) # For each anomaly: # - Severity (low, medium, high, critical) # - Affected metrics # - Similar past incidents # - Recommended actions # Generates: # - Anomaly timeline # - Metric correlations (which metrics moved together) # - Root cause analysis # - Alert rules ``` **System Monitoring Best Practices**: ```python # 1. Use time windows pipeline.add_time_windows([ '5min', # Immediate spikes '1hour', # Short-term trends '24hour' # Daily patterns ]) # 2. Correlate metrics pipeline.detect_correlations([ ('high_cpu', 'slow_response'), ('memory_leak', 'increasing_errors') ]) # 3. Reduce alert fatigue pipeline.set_alert_rules( min_severity='medium', min_duration='5min', # Ignore transient spikes max_alerts_per_hour=5 ) ``` ### Workflow 3: Manufacturing Quality Control ```python from specweave import QualityControlPipeline # Detect defective products from sensor data pipeline = QualityControlPipeline(increment="0042") # Train on good products pipeline.fit(good_product_sensors) # Detect defects in production line defect_scores = pipeline.predict(production_line_data) # Generates: # - Real-time defect alerts # - Defect rate trends # - Most common defect patterns # - Preventive maintenance recommendations ``` ### Workflow 4: Network Intrusion Detection ```python from specweave import IntrusionDetectionPipeline # Detect malicious network traffic pipeline = IntrusionDetectionPipeline(increment="0042") # Features: packet size, frequency, ports, protocols, etc. pipeline.fit(normal_network_traffic) # Detect intrusions intrusions = pipeline.detect(network_traffic_stream) # Generates: # - Attack type classification (DDoS, port scan, etc.) # - Severity scores # - Source IPs # - Attack timeline ``` ## Evaluation Metrics **Anomaly detection metrics** (different from classification): ```python from specweave import AnomalyEvaluator evaluator = AnomalyEvaluator(increment="0042") metrics = evaluator.evaluate( y_true=true_labels, # 0=normal, 1=anomaly y_pred=predictions, y_scores=anomaly_scores ) ``` **Key Metrics**: 1. **Precision @ K** - Of top K flagged anomalies, how many are real? ```python precision_at_100 = evaluator.precision_at_k(k=100) # "Of 100 flagged transactions, 85 were actual fraud" = 85% ``` 2. **Recall @ K** - Of all real anomalies, how many did we catch in top K? ```python recall_at_100 = evaluator.recall_at_k(k=100) # "We caught 78% of all fraud in top 100 flagged" ``` 3. **ROC AUC** - Overall discrimination ability ```python roc_auc = evaluator.roc_auc(y_true, y_scores) # 0.95 = excellent discrimination ``` 4. **PR AUC** - Better for imbalanced data ```python pr_auc = evaluator.pr_auc(y_true, y_scores) # More informative when anomalies are rare (<5%) ``` **Evaluation Report**: ```markdown # Anomaly Detection Evaluation ## Dataset - Total samples: 100,000 - Anomalies: 500 (0.5%) - Features: 25 ## Method: Isolation Forest ## Performance Metrics - ROC AUC: 0.94 ✅ (excellent) - PR AUC: 0.78 ✅ (good for 0.5% anomaly rate) ## Precision-Recall Tradeoff - Precision @ 100: 85% (85 true anomalies in top 100) - Recall @ 100: 17% (caught 17% of all anomalies) - Precision @ 500: 62% (310 true anomalies in top 500) - Recall @ 500: 62% (caught 62% of all anomalies) ## Business Impact (Fraud Detection Example) - Review budget: 500 transactions/day - At Precision @ 500 = 62%: - True fraud caught: 310/day ($155,000 saved) - False positives: 190/day ($950 review cost) - Net benefit: $154,050/day ✅ ## Recommendation ✅ DEPLOY with threshold for top 500 (62% precision) ``` ## Integration with SpecWeave ### Increment Structure ``` .specweave/increments/0042-fraud-detection/ ├── spec.md (detection requirements, business impact) ├── plan.md (method selection, threshold tuning) ├── tasks.md ├── data/ │ ├── normal_transactions.csv │ ├── labeled_fraud.csv (if available) │ └── schema.yaml ├── experiments/ │ ├── statistical-baseline/ │ ├── isolation-forest/ │ ├── one-class-svm/ │ └── autoencoder/ ├── models/ │ ├── isolation_forest_model.pkl │ └── threshold_config.json ├── evaluation/ │ ├── precision_recall_curve.png │ ├── roc_curve.png │ ├── top_anomalies.csv │ └── evaluation_report.md └── deployment/ ├── real_time_api.py ├── monitoring_dashboard.json └── alert_rules.yaml ``` ## Best Practices ### 1. Start with Labeled Anomalies (if available) ```python # Use labeled data to validate unsupervised methods detector.fit(X_train) # Unlabeled # Evaluate on labeled test set metrics = evaluator.evaluate(y_true_test, detector.predict(X_test)) # Choose method with best precision @ K ``` ### 2. Tune Contamination Parameter ```python # Try different contamination rates for contamination in [0.01, 0.05, 0.1, 0.2]: detector = IsolationForestDetector(contamination=contamination) detector.fit(X_train) metrics = evaluator.evaluate(y_test, detector.predict(X_test)) # Choose contamination that maximizes business value ``` ### 3. Explain Anomalies ```python # Don't just flag anomalies - explain why explainer = AnomalyExplainer(detector, increment="0042") for anomaly in top_anomalies: explanation = explainer.explain(anomaly) print(f"Anomaly: {anomaly.id}") print(f"Reasons:") print(f" - {explanation.top_features}") print(f" - Similar cases: {explanation.similar_cases}") ``` ### 4. Handle Concept Drift ```python # Anomalies evolve over time monitor = AnomalyMonitor(increment="0042") # Track detection performance monitor.track_daily_performance() # Retrain when accuracy drops if monitor.performance_degraded(): detector.retrain(new_normal_data) ``` ### 5. Set Business-Driven Thresholds ```python # Balance false positives vs false negatives optimizer = ThresholdOptimizer(increment="0042") optimal_threshold = optimizer.find_optimal( detector=detector, data=validation_data, false_positive_cost=5, # $5 per manual review false_negative_cost=500 # $500 per missed fraud ) # Use optimal threshold for deployment ``` ## Advanced Features ### 1. Ensemble Anomaly Detection ```python # Combine multiple detectors ensemble = AnomalyEnsemble(increment="0042") ensemble.add_detector("isolation_forest", weight=0.4) ensemble.add_detector("one_class_svm", weight=0.3) ensemble.add_detector("autoencoder", weight=0.3) # Ensemble vote (more robust) anomalies = ensemble.detect(X_test) ``` ### 2. Contextual Anomaly Detection ```python # What's normal varies by context detector = ContextualAnomalyDetector(increment="0042") # Different normality for different contexts detector.fit(data, contexts=['user_id', 'time_of_day', 'location']) # $10 transaction: Normal for user A, anomaly for user B ``` ### 3. Sequential Anomaly Detection ```python # Detect anomalous sequences (not just individual points) detector = SequenceAnomalyDetector( method='lstm', window_size=10, increment="0042" ) # Example: Login from unusual sequence of locations ``` ## Commands ```bash # Train anomaly detector /ml:train-anomaly-detector 0042 # Evaluate detector /ml:evaluate-anomaly-detector 0042 # Explain top anomalies /ml:explain-anomalies 0042 --top 100 ``` ## Summary Anomaly detection is critical for: - ✅ Fraud detection (financial transactions) - ✅ Security monitoring (intrusion detection) - ✅ Quality control (manufacturing defects) - ✅ System health (performance monitoring) - ✅ Business intelligence (unusual patterns) This skill provides battle-tested methods integrated with SpecWeave's increment workflow, ensuring anomaly detectors are reproducible, explainable, and business-aligned.