Files
gh-emillindfors-claude-mark…/commands/data-datafusion-query.md
2025-11-29 18:25:45 +08:00

550 lines
14 KiB
Markdown

---
description: Execute SQL queries with DataFusion against Parquet, CSV, and in-memory data
---
# DataFusion Query Execution
Help the user set up DataFusion and execute SQL queries against data stored in object storage (Parquet, CSV) or in-memory.
## Steps
1. **Add required dependencies**:
```toml
[dependencies]
datafusion = "39"
arrow = "52"
object_store = "0.9"
tokio = { version = "1", features = ["full"] }
```
2. **Create a DataFusion session context**:
```rust
use datafusion::prelude::*;
use datafusion::execution::context::{SessionContext, SessionConfig};
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeConfig};
use std::sync::Arc;
async fn create_context() -> Result<SessionContext> {
// Configure session
let config = SessionConfig::new()
.with_target_partitions(num_cpus::get()) // Match CPU count
.with_batch_size(8192); // Rows per batch
// Configure runtime
let runtime_config = RuntimeConfig::new()
.with_memory_limit(4 * 1024 * 1024 * 1024) // 4GB memory limit
.with_temp_file_path("/tmp/datafusion");
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
Ok(SessionContext::new_with_config_rt(config, runtime))
}
```
3. **Register object store** for S3/Azure/GCS:
```rust
use object_store::aws::AmazonS3Builder;
async fn register_object_store(ctx: &SessionContext) -> Result<()> {
// Create S3 store
let s3 = AmazonS3Builder::from_env()
.with_bucket_name("my-data-lake")
.build()?;
// Register with DataFusion
let url = "s3://my-data-lake/";
ctx.runtime_env().register_object_store(
&url::Url::parse(url)?,
Arc::new(s3),
);
Ok(())
}
```
4. **Register Parquet tables**:
```rust
use datafusion::datasource::listing::{
ListingOptions,
ListingTable,
ListingTableConfig,
ListingTableUrl,
};
use datafusion::datasource::file_format::parquet::ParquetFormat;
async fn register_parquet_table(
ctx: &SessionContext,
table_name: &str,
path: &str,
) -> Result<()> {
// Simple registration
ctx.register_parquet(
table_name,
path,
ParquetReadOptions::default(),
).await?;
Ok(())
}
// Advanced registration with partitioning
async fn register_partitioned_table(
ctx: &SessionContext,
table_name: &str,
path: &str,
) -> Result<()> {
let table_path = ListingTableUrl::parse(path)?;
let file_format = ParquetFormat::default();
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(".parquet")
.with_target_partitions(ctx.state().config().target_partitions())
.with_collect_stat(true); // Collect file statistics
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options);
let table = ListingTable::try_new(config)?;
ctx.register_table(table_name, Arc::new(table))?;
Ok(())
}
```
5. **Execute SQL queries**:
```rust
async fn execute_sql(ctx: &SessionContext, query: &str) -> Result<Vec<RecordBatch>> {
// Create DataFrame from SQL
let df = ctx.sql(query).await?;
// Collect all results
let batches = df.collect().await?;
Ok(batches)
}
// Example queries
async fn example_queries(ctx: &SessionContext) -> Result<()> {
// Simple select
let df = ctx.sql("
SELECT user_id, event_type, COUNT(*) as count
FROM events
WHERE date >= '2024-01-01'
GROUP BY user_id, event_type
ORDER BY count DESC
LIMIT 100
").await?;
df.show().await?;
// Window functions
let df = ctx.sql("
SELECT
user_id,
timestamp,
amount,
SUM(amount) OVER (
PARTITION BY user_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total
FROM transactions
").await?;
df.show().await?;
// Joins
let df = ctx.sql("
SELECT
e.user_id,
u.name,
COUNT(*) as event_count
FROM events e
JOIN users u ON e.user_id = u.id
GROUP BY e.user_id, u.name
").await?;
df.show().await?;
Ok(())
}
```
6. **Use DataFrame API** as an alternative to SQL:
```rust
use datafusion::prelude::*;
async fn dataframe_api_examples(ctx: &SessionContext) -> Result<()> {
// Get table
let df = ctx.table("events").await?;
// Filter
let df = df.filter(col("timestamp").gt(lit("2024-01-01")))?;
// Select columns
let df = df.select(vec![
col("user_id"),
col("event_type"),
col("timestamp"),
])?;
// Aggregate
let df = df.aggregate(
vec![col("user_id"), col("event_type")],
vec![
count(col("*")).alias("count"),
avg(col("duration")).alias("avg_duration"),
max(col("timestamp")).alias("max_time"),
],
)?;
// Sort
let df = df.sort(vec![
col("count").sort(false, true), // DESC NULLS LAST
])?;
// Limit
let df = df.limit(0, Some(100))?;
// Execute
let batches = df.collect().await?;
Ok(())
}
```
7. **Stream results** for large queries:
```rust
use futures::stream::StreamExt;
async fn stream_query_results(
ctx: &SessionContext,
query: &str,
) -> Result<()> {
let df = ctx.sql(query).await?;
// Get streaming results
let mut stream = df.execute_stream().await?;
// Process batches incrementally
let mut total_rows = 0;
while let Some(batch) = stream.next().await {
let batch = batch?;
total_rows += batch.num_rows();
// Process this batch
process_batch(&batch)?;
println!("Processed {} rows so far...", total_rows);
}
println!("Total rows: {}", total_rows);
Ok(())
}
fn process_batch(batch: &RecordBatch) -> Result<()> {
// Your processing logic
Ok(())
}
```
8. **Inspect query plans** for optimization:
```rust
async fn explain_query(ctx: &SessionContext, query: &str) -> Result<()> {
// Logical plan
let logical_plan = ctx.sql(query).await?.into_optimized_plan()?;
println!("Logical Plan:\n{}", logical_plan.display_indent());
// Physical plan
let df = ctx.sql(query).await?;
let physical_plan = df.create_physical_plan().await?;
println!("Physical Plan:\n{}", physical_plan.display_indent());
// Or use EXPLAIN in SQL
let df = ctx.sql(&format!("EXPLAIN {}", query)).await?;
df.show().await?;
Ok(())
}
```
## Advanced Features
**Register CSV tables**:
```rust
use datafusion::datasource::file_format::csv::CsvFormat;
async fn register_csv(ctx: &SessionContext) -> Result<()> {
ctx.register_csv(
"users",
"s3://my-bucket/users.csv",
CsvReadOptions::new()
.has_header(true)
.delimiter(b',')
.schema_infer_max_records(1000),
).await?;
Ok(())
}
```
**Register in-memory tables**:
```rust
use datafusion::datasource::MemTable;
async fn register_memory_table(
ctx: &SessionContext,
name: &str,
batches: Vec<RecordBatch>,
schema: SchemaRef,
) -> Result<()> {
let mem_table = MemTable::try_new(schema, vec![batches])?;
ctx.register_table(name, Arc::new(mem_table))?;
Ok(())
}
```
**Create temporary views**:
```rust
async fn create_view(ctx: &SessionContext) -> Result<()> {
// Create view from query
let df = ctx.sql("
SELECT user_id, COUNT(*) as count
FROM events
GROUP BY user_id
").await?;
ctx.register_table("user_counts", df.into_view())?;
// Now query the view
let results = ctx.sql("SELECT * FROM user_counts WHERE count > 100").await?;
results.show().await?;
Ok(())
}
```
**User-Defined Functions (UDFs)**:
```rust
use datafusion::logical_expr::{create_udf, Volatility, ColumnarValue};
use arrow::array::StringArray;
async fn register_udfs(ctx: &SessionContext) -> Result<()> {
// Create scalar UDF
let extract_domain = create_udf(
"extract_domain",
vec![DataType::Utf8],
Arc::new(DataType::Utf8),
Volatility::Immutable,
Arc::new(|args: &[ColumnarValue]| {
let urls = args[0].clone().into_array(1)?;
let urls = urls.as_any().downcast_ref::<StringArray>().unwrap();
let domains: StringArray = urls
.iter()
.map(|url| {
url.and_then(|u| url::Url::parse(u).ok())
.and_then(|u| u.host_str().map(|s| s.to_string()))
})
.collect();
Ok(ColumnarValue::Array(Arc::new(domains)))
}),
);
ctx.register_udf(extract_domain);
// Use in query
let df = ctx.sql("
SELECT
extract_domain(url) as domain,
COUNT(*) as count
FROM events
GROUP BY domain
").await?;
df.show().await?;
Ok(())
}
```
**Write query results to Parquet**:
```rust
async fn write_query_results(
ctx: &SessionContext,
query: &str,
output_path: &str,
) -> Result<()> {
let df = ctx.sql(query).await?;
// Write to Parquet
df.write_parquet(
output_path,
DataFrameWriteOptions::new(),
Some(WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3)?))
.build()),
).await?;
Ok(())
}
```
## Performance Optimization
**Partition pruning**:
```rust
// DataFusion automatically prunes partitions based on WHERE clauses
async fn partition_pruning_example(ctx: &SessionContext) -> Result<()> {
// Assuming Hive-style partitioning: year=2024/month=01/...
// This query only scans year=2024/month=01 partitions
let df = ctx.sql("
SELECT * FROM events
WHERE year = 2024 AND month = 1
").await?;
// Use EXPLAIN to verify partition pruning
let explain = ctx.sql("EXPLAIN SELECT * FROM events WHERE year = 2024 AND month = 1").await?;
explain.show().await?;
Ok(())
}
```
**Predicate pushdown**:
```rust
// DataFusion pushes predicates to Parquet readers automatically
// This reads only relevant row groups based on statistics
let df = ctx.sql("
SELECT * FROM events
WHERE user_id = 'user123'
AND timestamp >= '2024-01-01'
").await?;
```
**Projection pushdown**:
```rust
// Only requested columns are read from Parquet
let df = ctx.sql("
SELECT user_id, timestamp
FROM events
").await?; // Only reads user_id and timestamp columns
```
**Parallelism tuning**:
```rust
let config = SessionConfig::new()
.with_target_partitions(16); // Increase for better parallelism
let ctx = SessionContext::new_with_config(config);
```
## Common Patterns
**Aggregating across partitions**:
```rust
async fn aggregate_partitions(ctx: &SessionContext) -> Result<()> {
let df = ctx.sql("
SELECT
year,
month,
COUNT(*) as total_events,
COUNT(DISTINCT user_id) as unique_users,
AVG(duration) as avg_duration
FROM events
WHERE year = 2024
GROUP BY year, month
ORDER BY month
").await?;
df.show().await?;
Ok(())
}
```
**Time-series analysis**:
```rust
async fn time_series_analysis(ctx: &SessionContext) -> Result<()> {
let df = ctx.sql("
SELECT
DATE_TRUNC('hour', timestamp) as hour,
COUNT(*) as events_per_hour,
AVG(value) as avg_value,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95_value
FROM metrics
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY 1
ORDER BY 1
").await?;
df.show().await?;
Ok(())
}
```
**Complex joins**:
```rust
async fn complex_join(ctx: &SessionContext) -> Result<()> {
let df = ctx.sql("
SELECT
e.event_type,
u.country,
COUNT(*) as count,
AVG(e.duration) as avg_duration
FROM events e
JOIN users u ON e.user_id = u.id
LEFT JOIN subscriptions s ON u.id = s.user_id
WHERE e.timestamp >= '2024-01-01'
AND u.active = true
GROUP BY e.event_type, u.country
HAVING count > 100
ORDER BY count DESC
").await?;
df.show().await?;
Ok(())
}
```
## Best Practices
- **Use partition pruning** by filtering on partition columns (year, month, day)
- **Select only needed columns** to leverage projection pushdown
- **Configure appropriate parallelism** based on CPU cores and data size
- **Use EXPLAIN** to verify query optimization
- **Stream large results** instead of collecting all at once
- **Register statistics** when creating tables for better query planning
- **Create views** for commonly used queries
- **Use UDFs** for custom business logic
## Troubleshooting
**Out of memory**:
- Reduce batch size: `.with_batch_size(4096)`
- Set memory limit: `.with_memory_limit()`
- Stream results instead of collecting
- Enable spilling to disk with temp_file_path
**Slow queries**:
- Use EXPLAIN to inspect query plan
- Verify partition pruning is working
- Check if predicates can be pushed down
- Increase parallelism: `.with_target_partitions()`
- Ensure object store is registered correctly
**Schema errors**:
- Verify table registration: `ctx.table("name").await?.schema()`
- Check for schema evolution in Parquet files
- Use explicit schema for CSV files
- Handle NULL values appropriately
**Partition not found**:
- Verify path format matches Hive partitioning
- Check object store URL registration
- List files to debug: `store.list(prefix).await`