572 lines
13 KiB
Markdown
572 lines
13 KiB
Markdown
# Performance and Optimization
|
|
|
|
This reference covers Vaex's performance features including lazy evaluation, caching, memory management, async operations, and optimization strategies for processing massive datasets.
|
|
|
|
## Understanding Lazy Evaluation
|
|
|
|
Lazy evaluation is the foundation of Vaex's performance:
|
|
|
|
### How Lazy Evaluation Works
|
|
|
|
```python
|
|
import vaex
|
|
|
|
df = vaex.open('large_file.hdf5')
|
|
|
|
# No computation happens here - just defines what to compute
|
|
df['total'] = df.price * df.quantity
|
|
df['log_price'] = df.price.log()
|
|
mean_expr = df.total.mean()
|
|
|
|
# Computation happens here (when result is needed)
|
|
result = mean_expr # Now the mean is actually calculated
|
|
```
|
|
|
|
**Key concepts:**
|
|
- **Expressions** are lazy - they define computations without executing them
|
|
- **Materialization** happens when you access the result
|
|
- **Query optimization** happens automatically before execution
|
|
|
|
### When Does Evaluation Happen?
|
|
|
|
```python
|
|
# These trigger evaluation:
|
|
print(df.x.mean()) # Accessing value
|
|
array = df.x.values # Getting NumPy array
|
|
pdf = df.to_pandas_df() # Converting to pandas
|
|
df.export_hdf5('output.hdf5') # Exporting
|
|
|
|
# These do NOT trigger evaluation:
|
|
df['new_col'] = df.x + df.y # Creating virtual column
|
|
expr = df.x.mean() # Creating expression
|
|
df_filtered = df[df.x > 10] # Creating filtered view
|
|
```
|
|
|
|
## Batching Operations with delay=True
|
|
|
|
Execute multiple operations together for better performance:
|
|
|
|
### Basic Delayed Execution
|
|
|
|
```python
|
|
# Without delay - each operation processes entire dataset
|
|
mean_x = df.x.mean() # Pass 1 through data
|
|
std_x = df.x.std() # Pass 2 through data
|
|
max_x = df.x.max() # Pass 3 through data
|
|
|
|
# With delay - single pass through dataset
|
|
mean_x = df.x.mean(delay=True)
|
|
std_x = df.x.std(delay=True)
|
|
max_x = df.x.max(delay=True)
|
|
results = vaex.execute([mean_x, std_x, max_x]) # Single pass!
|
|
|
|
print(results[0]) # mean
|
|
print(results[1]) # std
|
|
print(results[2]) # max
|
|
```
|
|
|
|
### Delayed Execution with Multiple Columns
|
|
|
|
```python
|
|
# Compute statistics for many columns efficiently
|
|
stats = {}
|
|
delayed_results = []
|
|
|
|
for column in ['sales', 'quantity', 'profit', 'cost']:
|
|
mean = df[column].mean(delay=True)
|
|
std = df[column].std(delay=True)
|
|
delayed_results.extend([mean, std])
|
|
|
|
# Execute all at once
|
|
results = vaex.execute(delayed_results)
|
|
|
|
# Process results
|
|
for i, column in enumerate(['sales', 'quantity', 'profit', 'cost']):
|
|
stats[column] = {
|
|
'mean': results[i*2],
|
|
'std': results[i*2 + 1]
|
|
}
|
|
```
|
|
|
|
### When to Use delay=True
|
|
|
|
Use `delay=True` when:
|
|
- Computing multiple aggregations
|
|
- Computing statistics on many columns
|
|
- Building dashboards or reports
|
|
- Any scenario requiring multiple passes through data
|
|
|
|
```python
|
|
# Bad: 4 passes through dataset
|
|
mean1 = df.col1.mean()
|
|
mean2 = df.col2.mean()
|
|
mean3 = df.col3.mean()
|
|
mean4 = df.col4.mean()
|
|
|
|
# Good: 1 pass through dataset
|
|
results = vaex.execute([
|
|
df.col1.mean(delay=True),
|
|
df.col2.mean(delay=True),
|
|
df.col3.mean(delay=True),
|
|
df.col4.mean(delay=True)
|
|
])
|
|
```
|
|
|
|
## Asynchronous Operations
|
|
|
|
Process data asynchronously using async/await:
|
|
|
|
### Async with async/await
|
|
|
|
```python
|
|
import vaex
|
|
import asyncio
|
|
|
|
async def compute_statistics(df):
|
|
# Create async tasks
|
|
mean_task = df.x.mean(delay=True)
|
|
std_task = df.x.std(delay=True)
|
|
|
|
# Execute asynchronously
|
|
results = await vaex.async_execute([mean_task, std_task])
|
|
|
|
return {'mean': results[0], 'std': results[1]}
|
|
|
|
# Run async function
|
|
async def main():
|
|
df = vaex.open('large_file.hdf5')
|
|
stats = await compute_statistics(df)
|
|
print(stats)
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
### Using Promises/Futures
|
|
|
|
```python
|
|
# Get future object
|
|
future = df.x.mean(delay=True)
|
|
|
|
# Do other work...
|
|
|
|
# Get result when ready
|
|
result = future.get() # Blocks until complete
|
|
```
|
|
|
|
## Virtual Columns vs Materialized Columns
|
|
|
|
Understanding the difference is crucial for performance:
|
|
|
|
### Virtual Columns (Preferred)
|
|
|
|
```python
|
|
# Virtual column - computed on-the-fly, zero memory
|
|
df['total'] = df.price * df.quantity
|
|
df['log_sales'] = df.sales.log()
|
|
df['full_name'] = df.first_name + ' ' + df.last_name
|
|
|
|
# Check if virtual
|
|
print(df.is_local('total')) # False = virtual
|
|
|
|
# Benefits:
|
|
# - Zero memory overhead
|
|
# - Always up-to-date if source data changes
|
|
# - Fast to create
|
|
```
|
|
|
|
### Materialized Columns
|
|
|
|
```python
|
|
# Materialize a virtual column
|
|
df['total_materialized'] = df['total'].values
|
|
|
|
# Or use materialize method
|
|
df = df.materialize(df['total'], inplace=True)
|
|
|
|
# Check if materialized
|
|
print(df.is_local('total_materialized')) # True = materialized
|
|
|
|
# When to materialize:
|
|
# - Column computed repeatedly (amortize cost)
|
|
# - Complex expression used in many operations
|
|
# - Need to export data
|
|
```
|
|
|
|
### Deciding: Virtual vs Materialized
|
|
|
|
```python
|
|
# Virtual is better when:
|
|
# - Column is simple (x + y, x * 2, etc.)
|
|
# - Column used infrequently
|
|
# - Memory is limited
|
|
|
|
# Materialize when:
|
|
# - Complex computation (multiple operations)
|
|
# - Used repeatedly in aggregations
|
|
# - Slows down other operations
|
|
|
|
# Example: Complex calculation used many times
|
|
df['complex'] = (df.x.log() * df.y.sqrt() + df.z ** 2).values # Materialize
|
|
```
|
|
|
|
## Caching Strategies
|
|
|
|
Vaex automatically caches some operations, but you can optimize further:
|
|
|
|
### Automatic Caching
|
|
|
|
```python
|
|
# First call computes and caches
|
|
mean1 = df.x.mean() # Computes
|
|
|
|
# Second call uses cache
|
|
mean2 = df.x.mean() # From cache (instant)
|
|
|
|
# Cache invalidated if DataFrame changes
|
|
df['new_col'] = df.x + 1
|
|
mean3 = df.x.mean() # Recomputes
|
|
```
|
|
|
|
### State Management
|
|
|
|
```python
|
|
# Save DataFrame state (includes virtual columns)
|
|
df.state_write('state.json')
|
|
|
|
# Load state later
|
|
df_new = vaex.open('data.hdf5')
|
|
df_new.state_load('state.json') # Restores virtual columns, selections
|
|
```
|
|
|
|
### Checkpoint Pattern
|
|
|
|
```python
|
|
# Export intermediate results for complex pipelines
|
|
df['processed'] = complex_calculation(df)
|
|
|
|
# Save checkpoint
|
|
df.export_hdf5('checkpoint.hdf5')
|
|
|
|
# Resume from checkpoint
|
|
df = vaex.open('checkpoint.hdf5')
|
|
# Continue processing...
|
|
```
|
|
|
|
## Memory Management
|
|
|
|
Optimize memory usage for very large datasets:
|
|
|
|
### Memory-Mapped Files
|
|
|
|
```python
|
|
# HDF5 and Arrow are memory-mapped (optimal)
|
|
df = vaex.open('data.hdf5') # No memory used until accessed
|
|
|
|
# File stays on disk, only accessed portions loaded to RAM
|
|
mean = df.x.mean() # Streams through data, minimal memory
|
|
```
|
|
|
|
### Chunked Processing
|
|
|
|
```python
|
|
# Process large DataFrame in chunks
|
|
chunk_size = 1_000_000
|
|
|
|
for i1, i2, chunk in df.to_pandas_df(chunk_size=chunk_size):
|
|
# Process chunk (careful: defeats Vaex's purpose)
|
|
process_chunk(chunk)
|
|
|
|
# Better: Use Vaex operations directly (no chunking needed)
|
|
result = df.x.mean() # Handles large data automatically
|
|
```
|
|
|
|
### Monitoring Memory Usage
|
|
|
|
```python
|
|
# Check DataFrame memory footprint
|
|
print(df.byte_size()) # Bytes used by materialized columns
|
|
|
|
# Check column memory
|
|
for col in df.get_column_names():
|
|
if df.is_local(col):
|
|
print(f"{col}: {df[col].nbytes / 1e9:.2f} GB")
|
|
|
|
# Profile operations
|
|
import vaex.profiler
|
|
with vaex.profiler():
|
|
result = df.x.mean()
|
|
```
|
|
|
|
## Parallel Computation
|
|
|
|
Vaex automatically parallelizes operations:
|
|
|
|
### Multithreading
|
|
|
|
```python
|
|
# Vaex uses all CPU cores by default
|
|
import vaex
|
|
|
|
# Check/set thread count
|
|
print(vaex.multithreading.thread_count_default)
|
|
vaex.multithreading.thread_count_default = 8 # Use 8 threads
|
|
|
|
# Operations automatically parallelize
|
|
mean = df.x.mean() # Uses all threads
|
|
```
|
|
|
|
### Distributed Computing with Dask
|
|
|
|
```python
|
|
# Convert to Dask for distributed processing
|
|
import vaex
|
|
import dask.dataframe as dd
|
|
|
|
# Create Vaex DataFrame
|
|
df_vaex = vaex.open('large_file.hdf5')
|
|
|
|
# Convert to Dask
|
|
df_dask = df_vaex.to_dask_dataframe()
|
|
|
|
# Process with Dask
|
|
result = df_dask.groupby('category')['value'].sum().compute()
|
|
```
|
|
|
|
## JIT Compilation
|
|
|
|
Vaex can use Just-In-Time compilation for custom operations:
|
|
|
|
### Using Numba
|
|
|
|
```python
|
|
import vaex
|
|
import numba
|
|
|
|
# Define JIT-compiled function
|
|
@numba.jit
|
|
def custom_calculation(x, y):
|
|
return x ** 2 + y ** 2
|
|
|
|
# Apply to DataFrame
|
|
df['custom'] = df.apply(custom_calculation,
|
|
arguments=[df.x, df.y],
|
|
vectorize=True)
|
|
```
|
|
|
|
### Custom Aggregations
|
|
|
|
```python
|
|
@numba.jit
|
|
def custom_sum(a):
|
|
total = 0
|
|
for val in a:
|
|
total += val * 2 # Custom logic
|
|
return total
|
|
|
|
# Use in aggregation
|
|
result = df.x.custom_agg(custom_sum)
|
|
```
|
|
|
|
## Optimization Strategies
|
|
|
|
### Strategy 1: Minimize Materializations
|
|
|
|
```python
|
|
# Bad: Creates many materialized columns
|
|
df['a'] = (df.x + df.y).values
|
|
df['b'] = (df.a * 2).values
|
|
df['c'] = (df.b + df.z).values
|
|
|
|
# Good: Keep virtual until final export
|
|
df['a'] = df.x + df.y
|
|
df['b'] = df.a * 2
|
|
df['c'] = df.b + df.z
|
|
# Only materialize if exporting:
|
|
# df.export_hdf5('output.hdf5')
|
|
```
|
|
|
|
### Strategy 2: Use Selections Instead of Filtering
|
|
|
|
```python
|
|
# Less efficient: Creates new DataFrames
|
|
df_high = df[df.value > 100]
|
|
df_low = df[df.value <= 100]
|
|
mean_high = df_high.value.mean()
|
|
mean_low = df_low.value.mean()
|
|
|
|
# More efficient: Use selections
|
|
df.select(df.value > 100, name='high')
|
|
df.select(df.value <= 100, name='low')
|
|
mean_high = df.value.mean(selection='high')
|
|
mean_low = df.value.mean(selection='low')
|
|
```
|
|
|
|
### Strategy 3: Batch Aggregations
|
|
|
|
```python
|
|
# Less efficient: Multiple passes
|
|
stats = {
|
|
'mean': df.x.mean(),
|
|
'std': df.x.std(),
|
|
'min': df.x.min(),
|
|
'max': df.x.max()
|
|
}
|
|
|
|
# More efficient: Single pass
|
|
delayed = [
|
|
df.x.mean(delay=True),
|
|
df.x.std(delay=True),
|
|
df.x.min(delay=True),
|
|
df.x.max(delay=True)
|
|
]
|
|
results = vaex.execute(delayed)
|
|
stats = dict(zip(['mean', 'std', 'min', 'max'], results))
|
|
```
|
|
|
|
### Strategy 4: Choose Optimal File Formats
|
|
|
|
```python
|
|
# Slow: Large CSV
|
|
df = vaex.from_csv('huge.csv') # Can take minutes
|
|
|
|
# Fast: HDF5 or Arrow
|
|
df = vaex.open('huge.hdf5') # Instant
|
|
df = vaex.open('huge.arrow') # Instant
|
|
|
|
# One-time conversion
|
|
df = vaex.from_csv('huge.csv', convert='huge.hdf5')
|
|
# Future loads: vaex.open('huge.hdf5')
|
|
```
|
|
|
|
### Strategy 5: Optimize Expressions
|
|
|
|
```python
|
|
# Less efficient: Repeated calculations
|
|
df['result'] = df.x.log() + df.x.log() * 2
|
|
|
|
# More efficient: Reuse calculations
|
|
df['log_x'] = df.x.log()
|
|
df['result'] = df.log_x + df.log_x * 2
|
|
|
|
# Even better: Combine operations
|
|
df['result'] = df.x.log() * 3 # Simplified math
|
|
```
|
|
|
|
## Performance Profiling
|
|
|
|
### Basic Profiling
|
|
|
|
```python
|
|
import time
|
|
import vaex
|
|
|
|
df = vaex.open('large_file.hdf5')
|
|
|
|
# Time operations
|
|
start = time.time()
|
|
result = df.x.mean()
|
|
elapsed = time.time() - start
|
|
print(f"Computed in {elapsed:.2f} seconds")
|
|
```
|
|
|
|
### Detailed Profiling
|
|
|
|
```python
|
|
# Profile with context manager
|
|
with vaex.profiler():
|
|
result = df.groupby('category').agg({'value': 'sum'})
|
|
# Prints detailed timing information
|
|
```
|
|
|
|
### Benchmarking Patterns
|
|
|
|
```python
|
|
# Compare strategies
|
|
def benchmark_operation(operation, name):
|
|
start = time.time()
|
|
result = operation()
|
|
elapsed = time.time() - start
|
|
print(f"{name}: {elapsed:.3f}s")
|
|
return result
|
|
|
|
# Test different approaches
|
|
benchmark_operation(lambda: df.x.mean(), "Direct mean")
|
|
benchmark_operation(lambda: df[df.x > 0].x.mean(), "Filtered mean")
|
|
benchmark_operation(lambda: df.x.mean(selection='positive'), "Selection mean")
|
|
```
|
|
|
|
## Common Performance Issues and Solutions
|
|
|
|
### Issue: Slow Aggregations
|
|
|
|
```python
|
|
# Problem: Multiple separate aggregations
|
|
for col in df.column_names:
|
|
print(f"{col}: {df[col].mean()}")
|
|
|
|
# Solution: Batch with delay=True
|
|
delayed = [df[col].mean(delay=True) for col in df.column_names]
|
|
results = vaex.execute(delayed)
|
|
for col, result in zip(df.column_names, results):
|
|
print(f"{col}: {result}")
|
|
```
|
|
|
|
### Issue: High Memory Usage
|
|
|
|
```python
|
|
# Problem: Materializing large virtual columns
|
|
df['large_col'] = (complex_expression).values
|
|
|
|
# Solution: Keep virtual, or materialize and export
|
|
df['large_col'] = complex_expression # Virtual
|
|
# Or: df.export_hdf5('with_new_col.hdf5')
|
|
```
|
|
|
|
### Issue: Slow Exports
|
|
|
|
```python
|
|
# Problem: Exporting with many virtual columns
|
|
df.export_csv('output.csv') # Slow if many virtual columns
|
|
|
|
# Solution: Export to HDF5 or Arrow (faster)
|
|
df.export_hdf5('output.hdf5')
|
|
df.export_arrow('output.arrow')
|
|
|
|
# Or materialize first for CSV
|
|
df_materialized = df.materialize()
|
|
df_materialized.export_csv('output.csv')
|
|
```
|
|
|
|
### Issue: Repeated Complex Calculations
|
|
|
|
```python
|
|
# Problem: Complex virtual column used repeatedly
|
|
df['complex'] = df.x.log() * df.y.sqrt() + df.z ** 3
|
|
result1 = df.groupby('cat1').agg({'complex': 'mean'})
|
|
result2 = df.groupby('cat2').agg({'complex': 'sum'})
|
|
result3 = df.complex.std()
|
|
|
|
# Solution: Materialize once
|
|
df['complex'] = (df.x.log() * df.y.sqrt() + df.z ** 3).values
|
|
# Or: df = df.materialize('complex')
|
|
```
|
|
|
|
## Performance Best Practices Summary
|
|
|
|
1. **Use HDF5 or Arrow formats** - Orders of magnitude faster than CSV
|
|
2. **Leverage lazy evaluation** - Don't force computation until necessary
|
|
3. **Batch operations with delay=True** - Minimize passes through data
|
|
4. **Keep columns virtual** - Materialize only when beneficial
|
|
5. **Use selections not filters** - More efficient for multiple segments
|
|
6. **Profile your code** - Identify bottlenecks before optimizing
|
|
7. **Avoid `.values` and `.to_pandas_df()`** - Keep operations in Vaex
|
|
8. **Parallelize naturally** - Vaex uses all cores automatically
|
|
9. **Export to efficient formats** - Checkpoint complex pipelines
|
|
10. **Optimize expressions** - Simplify math and reuse calculations
|
|
|
|
## Related Resources
|
|
|
|
- For DataFrame basics: See `core_dataframes.md`
|
|
- For data operations: See `data_processing.md`
|
|
- For file I/O optimization: See `io_operations.md`
|