369 lines
8.6 KiB
Markdown
369 lines
8.6 KiB
Markdown
# Dask DataFrames
|
|
|
|
## Overview
|
|
|
|
Dask DataFrames enable parallel processing of large tabular data by distributing work across multiple pandas DataFrames. As described in the documentation, "Dask DataFrames are a collection of many pandas DataFrames" with identical APIs, making the transition from pandas straightforward.
|
|
|
|
## Core Concept
|
|
|
|
A Dask DataFrame is divided into multiple pandas DataFrames (partitions) along the index:
|
|
- Each partition is a regular pandas DataFrame
|
|
- Operations are applied to each partition in parallel
|
|
- Results are combined automatically
|
|
|
|
## Key Capabilities
|
|
|
|
### Scale
|
|
- Process 100 GiB on a laptop
|
|
- Process 100 TiB on a cluster
|
|
- Handle datasets exceeding available RAM
|
|
|
|
### Compatibility
|
|
- Implements most of the pandas API
|
|
- Easy transition from pandas code
|
|
- Works with familiar operations
|
|
|
|
## When to Use Dask DataFrames
|
|
|
|
**Use Dask When**:
|
|
- Dataset exceeds available RAM
|
|
- Computations require significant time and pandas optimization hasn't helped
|
|
- Need to scale from prototype (pandas) to production (larger data)
|
|
- Working with multiple files that should be processed together
|
|
|
|
**Stick with Pandas When**:
|
|
- Data fits comfortably in memory
|
|
- Computations complete in subseconds
|
|
- Simple operations without custom `.apply()` functions
|
|
- Iterative development and exploration
|
|
|
|
## Reading Data
|
|
|
|
Dask mirrors pandas reading syntax with added support for multiple files:
|
|
|
|
### Single File
|
|
```python
|
|
import dask.dataframe as dd
|
|
|
|
# Read single file
|
|
ddf = dd.read_csv('data.csv')
|
|
ddf = dd.read_parquet('data.parquet')
|
|
```
|
|
|
|
### Multiple Files
|
|
```python
|
|
# Read multiple files using glob patterns
|
|
ddf = dd.read_csv('data/*.csv')
|
|
ddf = dd.read_parquet('s3://mybucket/data/*.parquet')
|
|
|
|
# Read with path structure
|
|
ddf = dd.read_parquet('data/year=*/month=*/day=*.parquet')
|
|
```
|
|
|
|
### Optimizations
|
|
```python
|
|
# Specify columns to read (reduces memory)
|
|
ddf = dd.read_parquet('data.parquet', columns=['col1', 'col2'])
|
|
|
|
# Control partitioning
|
|
ddf = dd.read_csv('data.csv', blocksize='64MB') # Creates 64MB partitions
|
|
```
|
|
|
|
## Common Operations
|
|
|
|
All operations are lazy until `.compute()` is called.
|
|
|
|
### Filtering
|
|
```python
|
|
# Same as pandas
|
|
filtered = ddf[ddf['column'] > 100]
|
|
filtered = ddf.query('column > 100')
|
|
```
|
|
|
|
### Column Operations
|
|
```python
|
|
# Add columns
|
|
ddf['new_column'] = ddf['col1'] + ddf['col2']
|
|
|
|
# Select columns
|
|
subset = ddf[['col1', 'col2', 'col3']]
|
|
|
|
# Drop columns
|
|
ddf = ddf.drop(columns=['unnecessary_col'])
|
|
```
|
|
|
|
### Aggregations
|
|
```python
|
|
# Standard aggregations work as expected
|
|
mean = ddf['column'].mean().compute()
|
|
sum_total = ddf['column'].sum().compute()
|
|
counts = ddf['category'].value_counts().compute()
|
|
```
|
|
|
|
### GroupBy
|
|
```python
|
|
# GroupBy operations (may require shuffle)
|
|
grouped = ddf.groupby('category')['value'].mean().compute()
|
|
|
|
# Multiple aggregations
|
|
agg_result = ddf.groupby('category').agg({
|
|
'value': ['mean', 'sum', 'count'],
|
|
'amount': 'sum'
|
|
}).compute()
|
|
```
|
|
|
|
### Joins and Merges
|
|
```python
|
|
# Merge DataFrames
|
|
merged = dd.merge(ddf1, ddf2, on='key', how='left')
|
|
|
|
# Join on index
|
|
joined = ddf1.join(ddf2, on='key')
|
|
```
|
|
|
|
### Sorting
|
|
```python
|
|
# Sorting (expensive operation, requires data movement)
|
|
sorted_ddf = ddf.sort_values('column')
|
|
result = sorted_ddf.compute()
|
|
```
|
|
|
|
## Custom Operations
|
|
|
|
### Apply Functions
|
|
|
|
**To Partitions (Efficient)**:
|
|
```python
|
|
# Apply function to entire partitions
|
|
def custom_partition_function(partition_df):
|
|
# partition_df is a pandas DataFrame
|
|
return partition_df.assign(new_col=partition_df['col1'] * 2)
|
|
|
|
ddf = ddf.map_partitions(custom_partition_function)
|
|
```
|
|
|
|
**To Rows (Less Efficient)**:
|
|
```python
|
|
# Apply to each row (creates many tasks)
|
|
ddf['result'] = ddf.apply(lambda row: custom_function(row), axis=1, meta=('result', 'float'))
|
|
```
|
|
|
|
**Note**: Always prefer `map_partitions` over row-wise `apply` for better performance.
|
|
|
|
### Meta Parameter
|
|
|
|
When Dask can't infer output structure, specify the `meta` parameter:
|
|
```python
|
|
# For apply operations
|
|
ddf['new'] = ddf.apply(func, axis=1, meta=('new', 'float64'))
|
|
|
|
# For map_partitions
|
|
ddf = ddf.map_partitions(func, meta=pd.DataFrame({
|
|
'col1': pd.Series(dtype='float64'),
|
|
'col2': pd.Series(dtype='int64')
|
|
}))
|
|
```
|
|
|
|
## Lazy Evaluation and Computation
|
|
|
|
### Lazy Operations
|
|
```python
|
|
# These operations are lazy (instant, no computation)
|
|
filtered = ddf[ddf['value'] > 100]
|
|
aggregated = filtered.groupby('category').mean()
|
|
final = aggregated[aggregated['value'] < 500]
|
|
|
|
# Nothing has computed yet
|
|
```
|
|
|
|
### Triggering Computation
|
|
```python
|
|
# Compute single result
|
|
result = final.compute()
|
|
|
|
# Compute multiple results efficiently
|
|
result1, result2, result3 = dask.compute(
|
|
operation1,
|
|
operation2,
|
|
operation3
|
|
)
|
|
```
|
|
|
|
### Persist in Memory
|
|
```python
|
|
# Keep results in distributed memory for reuse
|
|
ddf_cached = ddf.persist()
|
|
|
|
# Now multiple operations on ddf_cached won't recompute
|
|
result1 = ddf_cached.mean().compute()
|
|
result2 = ddf_cached.sum().compute()
|
|
```
|
|
|
|
## Index Management
|
|
|
|
### Setting Index
|
|
```python
|
|
# Set index (required for efficient joins and certain operations)
|
|
ddf = ddf.set_index('timestamp', sorted=True)
|
|
```
|
|
|
|
### Index Properties
|
|
- Sorted index enables efficient filtering and joins
|
|
- Index determines partitioning
|
|
- Some operations perform better with appropriate index
|
|
|
|
## Writing Results
|
|
|
|
### To Files
|
|
```python
|
|
# Write to multiple files (one per partition)
|
|
ddf.to_parquet('output/data.parquet')
|
|
ddf.to_csv('output/data-*.csv')
|
|
|
|
# Write to single file (forces computation and concatenation)
|
|
ddf.compute().to_csv('output/single_file.csv')
|
|
```
|
|
|
|
### To Memory (Pandas)
|
|
```python
|
|
# Convert to pandas (loads all data in memory)
|
|
pdf = ddf.compute()
|
|
```
|
|
|
|
## Performance Considerations
|
|
|
|
### Efficient Operations
|
|
- Column selection and filtering: Very efficient
|
|
- Simple aggregations (sum, mean, count): Efficient
|
|
- Row-wise operations on partitions: Efficient with `map_partitions`
|
|
|
|
### Expensive Operations
|
|
- Sorting: Requires data shuffle across workers
|
|
- GroupBy with many groups: May require shuffle
|
|
- Complex joins: Depends on data distribution
|
|
- Row-wise apply: Creates many tasks
|
|
|
|
### Optimization Tips
|
|
|
|
**1. Select Columns Early**
|
|
```python
|
|
# Better: Read only needed columns
|
|
ddf = dd.read_parquet('data.parquet', columns=['col1', 'col2'])
|
|
```
|
|
|
|
**2. Filter Before GroupBy**
|
|
```python
|
|
# Better: Reduce data before expensive operations
|
|
result = ddf[ddf['year'] == 2024].groupby('category').sum().compute()
|
|
```
|
|
|
|
**3. Use Efficient File Formats**
|
|
```python
|
|
# Use Parquet instead of CSV for better performance
|
|
ddf.to_parquet('data.parquet') # Faster, smaller, columnar
|
|
```
|
|
|
|
**4. Repartition Appropriately**
|
|
```python
|
|
# If partitions are too small
|
|
ddf = ddf.repartition(npartitions=10)
|
|
|
|
# If partitions are too large
|
|
ddf = ddf.repartition(partition_size='100MB')
|
|
```
|
|
|
|
## Common Patterns
|
|
|
|
### ETL Pipeline
|
|
```python
|
|
import dask.dataframe as dd
|
|
|
|
# Read data
|
|
ddf = dd.read_csv('raw_data/*.csv')
|
|
|
|
# Transform
|
|
ddf = ddf[ddf['status'] == 'valid']
|
|
ddf['amount'] = ddf['amount'].astype('float64')
|
|
ddf = ddf.dropna(subset=['important_col'])
|
|
|
|
# Aggregate
|
|
summary = ddf.groupby('category').agg({
|
|
'amount': ['sum', 'mean'],
|
|
'quantity': 'count'
|
|
})
|
|
|
|
# Write results
|
|
summary.to_parquet('output/summary.parquet')
|
|
```
|
|
|
|
### Time Series Analysis
|
|
```python
|
|
# Read time series data
|
|
ddf = dd.read_parquet('timeseries/*.parquet')
|
|
|
|
# Set timestamp index
|
|
ddf = ddf.set_index('timestamp', sorted=True)
|
|
|
|
# Resample (if available in Dask version)
|
|
hourly = ddf.resample('1H').mean()
|
|
|
|
# Compute statistics
|
|
result = hourly.compute()
|
|
```
|
|
|
|
### Combining Multiple Files
|
|
```python
|
|
# Read multiple files as single DataFrame
|
|
ddf = dd.read_csv('data/2024-*.csv')
|
|
|
|
# Process combined data
|
|
result = ddf.groupby('category')['value'].sum().compute()
|
|
```
|
|
|
|
## Limitations and Differences from Pandas
|
|
|
|
### Not All Pandas Features Available
|
|
Some pandas operations are not implemented in Dask:
|
|
- Some string methods
|
|
- Certain window functions
|
|
- Some specialized statistical functions
|
|
|
|
### Partitioning Matters
|
|
- Operations within partitions are efficient
|
|
- Cross-partition operations may be expensive
|
|
- Index-based operations benefit from sorted index
|
|
|
|
### Lazy Evaluation
|
|
- Operations don't execute until `.compute()`
|
|
- Need to be aware of computation triggers
|
|
- Can't inspect intermediate results without computing
|
|
|
|
## Debugging Tips
|
|
|
|
### Inspect Partitions
|
|
```python
|
|
# Get number of partitions
|
|
print(ddf.npartitions)
|
|
|
|
# Compute single partition
|
|
first_partition = ddf.get_partition(0).compute()
|
|
|
|
# View first few rows (computes first partition)
|
|
print(ddf.head())
|
|
```
|
|
|
|
### Validate Operations on Small Data
|
|
```python
|
|
# Test on small sample first
|
|
sample = ddf.head(1000)
|
|
# Validate logic works
|
|
# Then scale to full dataset
|
|
result = ddf.compute()
|
|
```
|
|
|
|
### Check Dtypes
|
|
```python
|
|
# Verify data types are correct
|
|
print(ddf.dtypes)
|
|
```
|