729 lines
16 KiB
Markdown
729 lines
16 KiB
Markdown
# Machine Learning Integration
|
|
|
|
This reference covers Vaex's machine learning capabilities including transformers, encoders, feature engineering, model integration, and building ML pipelines on large datasets.
|
|
|
|
## Overview
|
|
|
|
Vaex provides a comprehensive ML framework (`vaex.ml`) that works seamlessly with large datasets. The framework includes:
|
|
- Transformers for feature scaling and engineering
|
|
- Encoders for categorical variables
|
|
- Dimensionality reduction (PCA)
|
|
- Clustering algorithms
|
|
- Integration with scikit-learn, XGBoost, LightGBM, CatBoost, and Keras
|
|
- State management for production deployment
|
|
|
|
**Key advantage:** All transformations create virtual columns, so preprocessing doesn't increase memory usage.
|
|
|
|
## Feature Scaling
|
|
|
|
### Standard Scaler
|
|
|
|
```python
|
|
import vaex
|
|
import vaex.ml
|
|
|
|
df = vaex.open('data.hdf5')
|
|
|
|
# Fit standard scaler
|
|
scaler = vaex.ml.StandardScaler(features=['age', 'income', 'score'])
|
|
scaler.fit(df)
|
|
|
|
# Transform (creates virtual columns)
|
|
df = scaler.transform(df)
|
|
|
|
# Scaled columns created as: 'standard_scaled_age', 'standard_scaled_income', etc.
|
|
print(df.column_names)
|
|
```
|
|
|
|
### MinMax Scaler
|
|
|
|
```python
|
|
# Scale to [0, 1] range
|
|
minmax_scaler = vaex.ml.MinMaxScaler(features=['age', 'income'])
|
|
minmax_scaler.fit(df)
|
|
df = minmax_scaler.transform(df)
|
|
|
|
# Custom range
|
|
minmax_scaler = vaex.ml.MinMaxScaler(
|
|
features=['age'],
|
|
feature_range=(-1, 1)
|
|
)
|
|
```
|
|
|
|
### MaxAbs Scaler
|
|
|
|
```python
|
|
# Scale by maximum absolute value
|
|
maxabs_scaler = vaex.ml.MaxAbsScaler(features=['values'])
|
|
maxabs_scaler.fit(df)
|
|
df = maxabs_scaler.transform(df)
|
|
```
|
|
|
|
### Robust Scaler
|
|
|
|
```python
|
|
# Scale using median and IQR (robust to outliers)
|
|
robust_scaler = vaex.ml.RobustScaler(features=['income', 'age'])
|
|
robust_scaler.fit(df)
|
|
df = robust_scaler.transform(df)
|
|
```
|
|
|
|
## Categorical Encoding
|
|
|
|
### Label Encoder
|
|
|
|
```python
|
|
# Encode categorical to integers
|
|
label_encoder = vaex.ml.LabelEncoder(features=['category', 'region'])
|
|
label_encoder.fit(df)
|
|
df = label_encoder.transform(df)
|
|
|
|
# Creates: 'label_encoded_category', 'label_encoded_region'
|
|
```
|
|
|
|
### One-Hot Encoder
|
|
|
|
```python
|
|
# Create binary columns for each category
|
|
onehot = vaex.ml.OneHotEncoder(features=['category'])
|
|
onehot.fit(df)
|
|
df = onehot.transform(df)
|
|
|
|
# Creates columns like: 'category_A', 'category_B', 'category_C'
|
|
|
|
# Control prefix
|
|
onehot = vaex.ml.OneHotEncoder(
|
|
features=['category'],
|
|
prefix='cat_'
|
|
)
|
|
```
|
|
|
|
### Frequency Encoder
|
|
|
|
```python
|
|
# Encode by category frequency
|
|
freq_encoder = vaex.ml.FrequencyEncoder(features=['category'])
|
|
freq_encoder.fit(df)
|
|
df = freq_encoder.transform(df)
|
|
|
|
# Each category replaced by its frequency in the dataset
|
|
```
|
|
|
|
### Target Encoder (Mean Encoder)
|
|
|
|
```python
|
|
# Encode category by target mean (for supervised learning)
|
|
target_encoder = vaex.ml.TargetEncoder(
|
|
features=['category'],
|
|
target='target_variable'
|
|
)
|
|
target_encoder.fit(df)
|
|
df = target_encoder.transform(df)
|
|
|
|
# Handles unseen categories with global mean
|
|
```
|
|
|
|
### Weight of Evidence Encoder
|
|
|
|
```python
|
|
# Encode for binary classification
|
|
woe_encoder = vaex.ml.WeightOfEvidenceEncoder(
|
|
features=['category'],
|
|
target='binary_target'
|
|
)
|
|
woe_encoder.fit(df)
|
|
df = woe_encoder.transform(df)
|
|
```
|
|
|
|
## Feature Engineering
|
|
|
|
### Binning/Discretization
|
|
|
|
```python
|
|
# Bin continuous variable into discrete bins
|
|
binner = vaex.ml.Discretizer(
|
|
features=['age'],
|
|
n_bins=5,
|
|
strategy='uniform' # or 'quantile'
|
|
)
|
|
binner.fit(df)
|
|
df = binner.transform(df)
|
|
```
|
|
|
|
### Cyclic Transformations
|
|
|
|
```python
|
|
# Transform cyclic features (hour, day, month)
|
|
cyclic = vaex.ml.CycleTransformer(
|
|
features=['hour', 'day_of_week'],
|
|
n=[24, 7] # Period for each feature
|
|
)
|
|
cyclic.fit(df)
|
|
df = cyclic.transform(df)
|
|
|
|
# Creates sin and cos components for each feature
|
|
```
|
|
|
|
### PCA (Principal Component Analysis)
|
|
|
|
```python
|
|
# Dimensionality reduction
|
|
pca = vaex.ml.PCA(
|
|
features=['feature1', 'feature2', 'feature3', 'feature4'],
|
|
n_components=2
|
|
)
|
|
pca.fit(df)
|
|
df = pca.transform(df)
|
|
|
|
# Creates: 'PCA_0', 'PCA_1'
|
|
|
|
# Access explained variance
|
|
print(pca.explained_variance_ratio_)
|
|
```
|
|
|
|
### Random Projection
|
|
|
|
```python
|
|
# Fast dimensionality reduction
|
|
projector = vaex.ml.RandomProjection(
|
|
features=['x1', 'x2', 'x3', 'x4', 'x5'],
|
|
n_components=3
|
|
)
|
|
projector.fit(df)
|
|
df = projector.transform(df)
|
|
```
|
|
|
|
## Clustering
|
|
|
|
### K-Means
|
|
|
|
```python
|
|
# Cluster data
|
|
kmeans = vaex.ml.KMeans(
|
|
features=['feature1', 'feature2', 'feature3'],
|
|
n_clusters=5,
|
|
max_iter=100
|
|
)
|
|
kmeans.fit(df)
|
|
df = kmeans.transform(df)
|
|
|
|
# Creates 'prediction' column with cluster labels
|
|
|
|
# Access cluster centers
|
|
print(kmeans.cluster_centers_)
|
|
```
|
|
|
|
## Integration with External Libraries
|
|
|
|
### Scikit-Learn
|
|
|
|
```python
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
import vaex.ml
|
|
|
|
# Prepare data
|
|
train_df = df[df.split == 'train']
|
|
test_df = df[df.split == 'test']
|
|
|
|
# Features and target
|
|
features = ['feature1', 'feature2', 'feature3']
|
|
target = 'target'
|
|
|
|
# Train scikit-learn model
|
|
model = RandomForestClassifier(n_estimators=100)
|
|
|
|
# Fit using Vaex data
|
|
sklearn_model = vaex.ml.sklearn.Predictor(
|
|
features=features,
|
|
target=target,
|
|
model=model,
|
|
prediction_name='rf_prediction'
|
|
)
|
|
sklearn_model.fit(train_df)
|
|
|
|
# Predict (creates virtual column)
|
|
test_df = sklearn_model.transform(test_df)
|
|
|
|
# Access predictions
|
|
predictions = test_df.rf_prediction.values
|
|
```
|
|
|
|
### XGBoost
|
|
|
|
```python
|
|
import xgboost as xgb
|
|
import vaex.ml
|
|
|
|
# Create XGBoost booster
|
|
booster = vaex.ml.xgboost.XGBoostModel(
|
|
features=features,
|
|
target=target,
|
|
prediction_name='xgb_pred'
|
|
)
|
|
|
|
# Configure parameters
|
|
params = {
|
|
'max_depth': 6,
|
|
'eta': 0.1,
|
|
'objective': 'reg:squarederror',
|
|
'eval_metric': 'rmse'
|
|
}
|
|
|
|
# Train
|
|
booster.fit(
|
|
df=train_df,
|
|
params=params,
|
|
num_boost_round=100
|
|
)
|
|
|
|
# Predict
|
|
test_df = booster.transform(test_df)
|
|
```
|
|
|
|
### LightGBM
|
|
|
|
```python
|
|
import lightgbm as lgb
|
|
import vaex.ml
|
|
|
|
# Create LightGBM model
|
|
lgb_model = vaex.ml.lightgbm.LightGBMModel(
|
|
features=features,
|
|
target=target,
|
|
prediction_name='lgb_pred'
|
|
)
|
|
|
|
# Parameters
|
|
params = {
|
|
'objective': 'binary',
|
|
'metric': 'auc',
|
|
'num_leaves': 31,
|
|
'learning_rate': 0.05
|
|
}
|
|
|
|
# Train
|
|
lgb_model.fit(
|
|
df=train_df,
|
|
params=params,
|
|
num_boost_round=100
|
|
)
|
|
|
|
# Predict
|
|
test_df = lgb_model.transform(test_df)
|
|
```
|
|
|
|
### CatBoost
|
|
|
|
```python
|
|
from catboost import CatBoostClassifier
|
|
import vaex.ml
|
|
|
|
# Create CatBoost model
|
|
catboost_model = vaex.ml.catboost.CatBoostModel(
|
|
features=features,
|
|
target=target,
|
|
prediction_name='catboost_pred'
|
|
)
|
|
|
|
# Parameters
|
|
params = {
|
|
'iterations': 100,
|
|
'depth': 6,
|
|
'learning_rate': 0.1,
|
|
'loss_function': 'Logloss'
|
|
}
|
|
|
|
# Train
|
|
catboost_model.fit(train_df, **params)
|
|
|
|
# Predict
|
|
test_df = catboost_model.transform(test_df)
|
|
```
|
|
|
|
### Keras/TensorFlow
|
|
|
|
```python
|
|
from tensorflow import keras
|
|
import vaex.ml
|
|
|
|
# Define Keras model
|
|
def create_model(input_dim):
|
|
model = keras.Sequential([
|
|
keras.layers.Dense(64, activation='relu', input_shape=(input_dim,)),
|
|
keras.layers.Dense(32, activation='relu'),
|
|
keras.layers.Dense(1, activation='sigmoid')
|
|
])
|
|
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
|
|
return model
|
|
|
|
# Wrap in Vaex
|
|
keras_model = vaex.ml.keras.KerasModel(
|
|
features=features,
|
|
target=target,
|
|
model=create_model(len(features)),
|
|
prediction_name='keras_pred'
|
|
)
|
|
|
|
# Train
|
|
keras_model.fit(
|
|
train_df,
|
|
epochs=10,
|
|
batch_size=10000
|
|
)
|
|
|
|
# Predict
|
|
test_df = keras_model.transform(test_df)
|
|
```
|
|
|
|
## Building ML Pipelines
|
|
|
|
### Sequential Pipeline
|
|
|
|
```python
|
|
import vaex.ml
|
|
|
|
# Create preprocessing pipeline
|
|
pipeline = []
|
|
|
|
# Step 1: Encode categorical
|
|
label_enc = vaex.ml.LabelEncoder(features=['category'])
|
|
pipeline.append(label_enc)
|
|
|
|
# Step 2: Scale features
|
|
scaler = vaex.ml.StandardScaler(features=['age', 'income'])
|
|
pipeline.append(scaler)
|
|
|
|
# Step 3: PCA
|
|
pca = vaex.ml.PCA(features=['age', 'income'], n_components=2)
|
|
pipeline.append(pca)
|
|
|
|
# Fit pipeline
|
|
for step in pipeline:
|
|
step.fit(df)
|
|
df = step.transform(df)
|
|
|
|
# Or use fit_transform
|
|
for step in pipeline:
|
|
df = step.fit_transform(df)
|
|
```
|
|
|
|
### Complete ML Pipeline
|
|
|
|
```python
|
|
import vaex
|
|
import vaex.ml
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
|
|
# Load data
|
|
df = vaex.open('data.hdf5')
|
|
|
|
# Split data
|
|
train_df = df[df.year < 2020]
|
|
test_df = df[df.year >= 2020]
|
|
|
|
# Define pipeline
|
|
# 1. Categorical encoding
|
|
cat_encoder = vaex.ml.LabelEncoder(features=['category', 'region'])
|
|
|
|
# 2. Feature scaling
|
|
scaler = vaex.ml.StandardScaler(features=['age', 'income', 'score'])
|
|
|
|
# 3. Model
|
|
features = ['label_encoded_category', 'label_encoded_region',
|
|
'standard_scaled_age', 'standard_scaled_income', 'standard_scaled_score']
|
|
model = vaex.ml.sklearn.Predictor(
|
|
features=features,
|
|
target='target',
|
|
model=RandomForestClassifier(n_estimators=100),
|
|
prediction_name='prediction'
|
|
)
|
|
|
|
# Fit pipeline
|
|
train_df = cat_encoder.fit_transform(train_df)
|
|
train_df = scaler.fit_transform(train_df)
|
|
model.fit(train_df)
|
|
|
|
# Apply to test
|
|
test_df = cat_encoder.transform(test_df)
|
|
test_df = scaler.transform(test_df)
|
|
test_df = model.transform(test_df)
|
|
|
|
# Evaluate
|
|
accuracy = (test_df.prediction == test_df.target).mean()
|
|
print(f"Accuracy: {accuracy:.4f}")
|
|
```
|
|
|
|
## State Management and Deployment
|
|
|
|
### Saving Pipeline State
|
|
|
|
```python
|
|
# After fitting all transformers and model
|
|
# Save the entire pipeline state
|
|
train_df.state_write('pipeline_state.json')
|
|
|
|
# In production: Load fresh data and apply transformations
|
|
prod_df = vaex.open('new_data.hdf5')
|
|
prod_df.state_load('pipeline_state.json')
|
|
|
|
# All transformations and models are applied
|
|
predictions = prod_df.prediction.values
|
|
```
|
|
|
|
### Transferring State Between DataFrames
|
|
|
|
```python
|
|
# Fit on training data
|
|
train_df = cat_encoder.fit_transform(train_df)
|
|
train_df = scaler.fit_transform(train_df)
|
|
model.fit(train_df)
|
|
|
|
# Save state
|
|
train_df.state_write('model_state.json')
|
|
|
|
# Apply to test data
|
|
test_df.state_load('model_state.json')
|
|
|
|
# Apply to validation data
|
|
val_df.state_load('model_state.json')
|
|
```
|
|
|
|
### Exporting with Transformations
|
|
|
|
```python
|
|
# Export DataFrame with all virtual columns materialized
|
|
df_with_features = df.copy()
|
|
df_with_features = df_with_features.materialize()
|
|
df_with_features.export_hdf5('processed_data.hdf5')
|
|
```
|
|
|
|
## Model Evaluation
|
|
|
|
### Classification Metrics
|
|
|
|
```python
|
|
# Binary classification
|
|
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score
|
|
|
|
y_true = test_df.target.values
|
|
y_pred = test_df.prediction.values
|
|
y_proba = test_df.prediction_proba.values if hasattr(test_df, 'prediction_proba') else None
|
|
|
|
accuracy = accuracy_score(y_true, y_pred)
|
|
f1 = f1_score(y_true, y_pred)
|
|
if y_proba is not None:
|
|
auc = roc_auc_score(y_true, y_proba)
|
|
|
|
print(f"Accuracy: {accuracy:.4f}")
|
|
print(f"F1 Score: {f1:.4f}")
|
|
if y_proba is not None:
|
|
print(f"AUC-ROC: {auc:.4f}")
|
|
```
|
|
|
|
### Regression Metrics
|
|
|
|
```python
|
|
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
|
|
|
|
y_true = test_df.target.values
|
|
y_pred = test_df.prediction.values
|
|
|
|
mse = mean_squared_error(y_true, y_pred)
|
|
rmse = np.sqrt(mse)
|
|
mae = mean_absolute_error(y_true, y_pred)
|
|
r2 = r2_score(y_true, y_pred)
|
|
|
|
print(f"RMSE: {rmse:.4f}")
|
|
print(f"MAE: {mae:.4f}")
|
|
print(f"R²: {r2:.4f}")
|
|
```
|
|
|
|
### Cross-Validation
|
|
|
|
```python
|
|
# Manual K-fold cross-validation
|
|
import numpy as np
|
|
|
|
# Create fold indices
|
|
df['fold'] = np.random.randint(0, 5, len(df))
|
|
|
|
results = []
|
|
for fold in range(5):
|
|
train = df[df.fold != fold]
|
|
val = df[df.fold == fold]
|
|
|
|
# Fit pipeline
|
|
train = encoder.fit_transform(train)
|
|
train = scaler.fit_transform(train)
|
|
model.fit(train)
|
|
|
|
# Validate
|
|
val = encoder.transform(val)
|
|
val = scaler.transform(val)
|
|
val = model.transform(val)
|
|
|
|
accuracy = (val.prediction == val.target).mean()
|
|
results.append(accuracy)
|
|
|
|
print(f"CV Accuracy: {np.mean(results):.4f} ± {np.std(results):.4f}")
|
|
```
|
|
|
|
## Feature Selection
|
|
|
|
### Correlation-Based
|
|
|
|
```python
|
|
# Compute correlations with target
|
|
correlations = {}
|
|
for feature in features:
|
|
corr = df.correlation(df[feature], df.target)
|
|
correlations[feature] = abs(corr)
|
|
|
|
# Sort by correlation
|
|
sorted_features = sorted(correlations.items(), key=lambda x: x[1], reverse=True)
|
|
top_features = [f[0] for f in sorted_features[:10]]
|
|
|
|
print("Top 10 features:", top_features)
|
|
```
|
|
|
|
### Variance-Based
|
|
|
|
```python
|
|
# Remove low-variance features
|
|
feature_variances = {}
|
|
for feature in features:
|
|
var = df[feature].std() ** 2
|
|
feature_variances[feature] = var
|
|
|
|
# Keep features with variance above threshold
|
|
threshold = 0.01
|
|
selected_features = [f for f, v in feature_variances.items() if v > threshold]
|
|
```
|
|
|
|
## Handling Imbalanced Data
|
|
|
|
### Class Weights
|
|
|
|
```python
|
|
# Compute class weights
|
|
class_counts = df.groupby('target', agg='count')
|
|
total = len(df)
|
|
weights = {
|
|
0: total / (2 * class_counts[0]),
|
|
1: total / (2 * class_counts[1])
|
|
}
|
|
|
|
# Use in model
|
|
model = RandomForestClassifier(class_weight=weights)
|
|
```
|
|
|
|
### Undersampling
|
|
|
|
```python
|
|
# Undersample majority class
|
|
minority_count = df[df.target == 1].count()
|
|
|
|
# Sample from majority class
|
|
majority_sampled = df[df.target == 0].sample(n=minority_count)
|
|
minority_all = df[df.target == 1]
|
|
|
|
# Combine
|
|
df_balanced = vaex.concat([majority_sampled, minority_all])
|
|
```
|
|
|
|
### Oversampling (SMOTE alternative)
|
|
|
|
```python
|
|
# Duplicate minority class samples
|
|
minority = df[df.target == 1]
|
|
majority = df[df.target == 0]
|
|
|
|
# Replicate minority
|
|
minority_oversampled = vaex.concat([minority] * 5)
|
|
|
|
# Combine
|
|
df_balanced = vaex.concat([majority, minority_oversampled])
|
|
```
|
|
|
|
## Common Patterns
|
|
|
|
### Pattern: End-to-End Classification Pipeline
|
|
|
|
```python
|
|
import vaex
|
|
import vaex.ml
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
|
|
# Load and split
|
|
df = vaex.open('data.hdf5')
|
|
train = df[df.split == 'train']
|
|
test = df[df.split == 'test']
|
|
|
|
# Preprocessing
|
|
# Categorical encoding
|
|
cat_enc = vaex.ml.LabelEncoder(features=['cat1', 'cat2'])
|
|
train = cat_enc.fit_transform(train)
|
|
|
|
# Feature scaling
|
|
scaler = vaex.ml.StandardScaler(features=['num1', 'num2', 'num3'])
|
|
train = scaler.fit_transform(train)
|
|
|
|
# Model training
|
|
features = ['label_encoded_cat1', 'label_encoded_cat2',
|
|
'standard_scaled_num1', 'standard_scaled_num2', 'standard_scaled_num3']
|
|
model = vaex.ml.sklearn.Predictor(
|
|
features=features,
|
|
target='target',
|
|
model=RandomForestClassifier(n_estimators=100)
|
|
)
|
|
model.fit(train)
|
|
|
|
# Save state
|
|
train.state_write('production_pipeline.json')
|
|
|
|
# Apply to test
|
|
test.state_load('production_pipeline.json')
|
|
|
|
# Evaluate
|
|
accuracy = (test.prediction == test.target).mean()
|
|
print(f"Test Accuracy: {accuracy:.4f}")
|
|
```
|
|
|
|
### Pattern: Feature Engineering Pipeline
|
|
|
|
```python
|
|
# Create rich features
|
|
df['age_squared'] = df.age ** 2
|
|
df['income_log'] = df.income.log()
|
|
df['age_income_interaction'] = df.age * df.income
|
|
|
|
# Binning
|
|
df['age_bin'] = df.age.digitize([0, 18, 30, 50, 65, 100])
|
|
|
|
# Cyclic features
|
|
df['hour_sin'] = (2 * np.pi * df.hour / 24).sin()
|
|
df['hour_cos'] = (2 * np.pi * df.hour / 24).cos()
|
|
|
|
# Aggregate features
|
|
avg_by_category = df.groupby('category').agg({'income': 'mean'})
|
|
# Join back to create feature
|
|
df = df.join(avg_by_category, on='category', rsuffix='_category_mean')
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
1. **Use virtual columns** - Transformers create virtual columns (no memory overhead)
|
|
2. **Save state files** - Enable easy deployment and reproduction
|
|
3. **Batch operations** - Use `delay=True` when computing multiple features
|
|
4. **Feature scaling** - Always scale features before PCA or distance-based algorithms
|
|
5. **Encode categories** - Use appropriate encoder (label, one-hot, target)
|
|
6. **Cross-validate** - Always validate on held-out data
|
|
7. **Monitor memory** - Check memory usage with `df.byte_size()`
|
|
8. **Export checkpoints** - Save intermediate results in long pipelines
|
|
|
|
## Related Resources
|
|
|
|
- For data preprocessing: See `data_processing.md`
|
|
- For performance optimization: See `performance.md`
|
|
- For DataFrame operations: See `core_dataframes.md`
|