14 KiB
14 KiB
description
| description |
|---|
| Execute SQL queries with DataFusion against Parquet, CSV, and in-memory data |
DataFusion Query Execution
Help the user set up DataFusion and execute SQL queries against data stored in object storage (Parquet, CSV) or in-memory.
Steps
-
Add required dependencies:
[dependencies] datafusion = "39" arrow = "52" object_store = "0.9" tokio = { version = "1", features = ["full"] } -
Create a DataFusion session context:
use datafusion::prelude::*; use datafusion::execution::context::{SessionContext, SessionConfig}; use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeConfig}; use std::sync::Arc; async fn create_context() -> Result<SessionContext> { // Configure session let config = SessionConfig::new() .with_target_partitions(num_cpus::get()) // Match CPU count .with_batch_size(8192); // Rows per batch // Configure runtime let runtime_config = RuntimeConfig::new() .with_memory_limit(4 * 1024 * 1024 * 1024) // 4GB memory limit .with_temp_file_path("/tmp/datafusion"); let runtime = Arc::new(RuntimeEnv::new(runtime_config)?); Ok(SessionContext::new_with_config_rt(config, runtime)) } -
Register object store for S3/Azure/GCS:
use object_store::aws::AmazonS3Builder; async fn register_object_store(ctx: &SessionContext) -> Result<()> { // Create S3 store let s3 = AmazonS3Builder::from_env() .with_bucket_name("my-data-lake") .build()?; // Register with DataFusion let url = "s3://my-data-lake/"; ctx.runtime_env().register_object_store( &url::Url::parse(url)?, Arc::new(s3), ); Ok(()) } -
Register Parquet tables:
use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, }; use datafusion::datasource::file_format::parquet::ParquetFormat; async fn register_parquet_table( ctx: &SessionContext, table_name: &str, path: &str, ) -> Result<()> { // Simple registration ctx.register_parquet( table_name, path, ParquetReadOptions::default(), ).await?; Ok(()) } // Advanced registration with partitioning async fn register_partitioned_table( ctx: &SessionContext, table_name: &str, path: &str, ) -> Result<()> { let table_path = ListingTableUrl::parse(path)?; let file_format = ParquetFormat::default(); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(".parquet") .with_target_partitions(ctx.state().config().target_partitions()) .with_collect_stat(true); // Collect file statistics let config = ListingTableConfig::new(table_path) .with_listing_options(listing_options); let table = ListingTable::try_new(config)?; ctx.register_table(table_name, Arc::new(table))?; Ok(()) } -
Execute SQL queries:
async fn execute_sql(ctx: &SessionContext, query: &str) -> Result<Vec<RecordBatch>> { // Create DataFrame from SQL let df = ctx.sql(query).await?; // Collect all results let batches = df.collect().await?; Ok(batches) } // Example queries async fn example_queries(ctx: &SessionContext) -> Result<()> { // Simple select let df = ctx.sql(" SELECT user_id, event_type, COUNT(*) as count FROM events WHERE date >= '2024-01-01' GROUP BY user_id, event_type ORDER BY count DESC LIMIT 100 ").await?; df.show().await?; // Window functions let df = ctx.sql(" SELECT user_id, timestamp, amount, SUM(amount) OVER ( PARTITION BY user_id ORDER BY timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as running_total FROM transactions ").await?; df.show().await?; // Joins let df = ctx.sql(" SELECT e.user_id, u.name, COUNT(*) as event_count FROM events e JOIN users u ON e.user_id = u.id GROUP BY e.user_id, u.name ").await?; df.show().await?; Ok(()) } -
Use DataFrame API as an alternative to SQL:
use datafusion::prelude::*; async fn dataframe_api_examples(ctx: &SessionContext) -> Result<()> { // Get table let df = ctx.table("events").await?; // Filter let df = df.filter(col("timestamp").gt(lit("2024-01-01")))?; // Select columns let df = df.select(vec![ col("user_id"), col("event_type"), col("timestamp"), ])?; // Aggregate let df = df.aggregate( vec![col("user_id"), col("event_type")], vec![ count(col("*")).alias("count"), avg(col("duration")).alias("avg_duration"), max(col("timestamp")).alias("max_time"), ], )?; // Sort let df = df.sort(vec![ col("count").sort(false, true), // DESC NULLS LAST ])?; // Limit let df = df.limit(0, Some(100))?; // Execute let batches = df.collect().await?; Ok(()) } -
Stream results for large queries:
use futures::stream::StreamExt; async fn stream_query_results( ctx: &SessionContext, query: &str, ) -> Result<()> { let df = ctx.sql(query).await?; // Get streaming results let mut stream = df.execute_stream().await?; // Process batches incrementally let mut total_rows = 0; while let Some(batch) = stream.next().await { let batch = batch?; total_rows += batch.num_rows(); // Process this batch process_batch(&batch)?; println!("Processed {} rows so far...", total_rows); } println!("Total rows: {}", total_rows); Ok(()) } fn process_batch(batch: &RecordBatch) -> Result<()> { // Your processing logic Ok(()) } -
Inspect query plans for optimization:
async fn explain_query(ctx: &SessionContext, query: &str) -> Result<()> { // Logical plan let logical_plan = ctx.sql(query).await?.into_optimized_plan()?; println!("Logical Plan:\n{}", logical_plan.display_indent()); // Physical plan let df = ctx.sql(query).await?; let physical_plan = df.create_physical_plan().await?; println!("Physical Plan:\n{}", physical_plan.display_indent()); // Or use EXPLAIN in SQL let df = ctx.sql(&format!("EXPLAIN {}", query)).await?; df.show().await?; Ok(()) }
Advanced Features
Register CSV tables:
use datafusion::datasource::file_format::csv::CsvFormat;
async fn register_csv(ctx: &SessionContext) -> Result<()> {
ctx.register_csv(
"users",
"s3://my-bucket/users.csv",
CsvReadOptions::new()
.has_header(true)
.delimiter(b',')
.schema_infer_max_records(1000),
).await?;
Ok(())
}
Register in-memory tables:
use datafusion::datasource::MemTable;
async fn register_memory_table(
ctx: &SessionContext,
name: &str,
batches: Vec<RecordBatch>,
schema: SchemaRef,
) -> Result<()> {
let mem_table = MemTable::try_new(schema, vec![batches])?;
ctx.register_table(name, Arc::new(mem_table))?;
Ok(())
}
Create temporary views:
async fn create_view(ctx: &SessionContext) -> Result<()> {
// Create view from query
let df = ctx.sql("
SELECT user_id, COUNT(*) as count
FROM events
GROUP BY user_id
").await?;
ctx.register_table("user_counts", df.into_view())?;
// Now query the view
let results = ctx.sql("SELECT * FROM user_counts WHERE count > 100").await?;
results.show().await?;
Ok(())
}
User-Defined Functions (UDFs):
use datafusion::logical_expr::{create_udf, Volatility, ColumnarValue};
use arrow::array::StringArray;
async fn register_udfs(ctx: &SessionContext) -> Result<()> {
// Create scalar UDF
let extract_domain = create_udf(
"extract_domain",
vec![DataType::Utf8],
Arc::new(DataType::Utf8),
Volatility::Immutable,
Arc::new(|args: &[ColumnarValue]| {
let urls = args[0].clone().into_array(1)?;
let urls = urls.as_any().downcast_ref::<StringArray>().unwrap();
let domains: StringArray = urls
.iter()
.map(|url| {
url.and_then(|u| url::Url::parse(u).ok())
.and_then(|u| u.host_str().map(|s| s.to_string()))
})
.collect();
Ok(ColumnarValue::Array(Arc::new(domains)))
}),
);
ctx.register_udf(extract_domain);
// Use in query
let df = ctx.sql("
SELECT
extract_domain(url) as domain,
COUNT(*) as count
FROM events
GROUP BY domain
").await?;
df.show().await?;
Ok(())
}
Write query results to Parquet:
async fn write_query_results(
ctx: &SessionContext,
query: &str,
output_path: &str,
) -> Result<()> {
let df = ctx.sql(query).await?;
// Write to Parquet
df.write_parquet(
output_path,
DataFrameWriteOptions::new(),
Some(WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3)?))
.build()),
).await?;
Ok(())
}
Performance Optimization
Partition pruning:
// DataFusion automatically prunes partitions based on WHERE clauses
async fn partition_pruning_example(ctx: &SessionContext) -> Result<()> {
// Assuming Hive-style partitioning: year=2024/month=01/...
// This query only scans year=2024/month=01 partitions
let df = ctx.sql("
SELECT * FROM events
WHERE year = 2024 AND month = 1
").await?;
// Use EXPLAIN to verify partition pruning
let explain = ctx.sql("EXPLAIN SELECT * FROM events WHERE year = 2024 AND month = 1").await?;
explain.show().await?;
Ok(())
}
Predicate pushdown:
// DataFusion pushes predicates to Parquet readers automatically
// This reads only relevant row groups based on statistics
let df = ctx.sql("
SELECT * FROM events
WHERE user_id = 'user123'
AND timestamp >= '2024-01-01'
").await?;
Projection pushdown:
// Only requested columns are read from Parquet
let df = ctx.sql("
SELECT user_id, timestamp
FROM events
").await?; // Only reads user_id and timestamp columns
Parallelism tuning:
let config = SessionConfig::new()
.with_target_partitions(16); // Increase for better parallelism
let ctx = SessionContext::new_with_config(config);
Common Patterns
Aggregating across partitions:
async fn aggregate_partitions(ctx: &SessionContext) -> Result<()> {
let df = ctx.sql("
SELECT
year,
month,
COUNT(*) as total_events,
COUNT(DISTINCT user_id) as unique_users,
AVG(duration) as avg_duration
FROM events
WHERE year = 2024
GROUP BY year, month
ORDER BY month
").await?;
df.show().await?;
Ok(())
}
Time-series analysis:
async fn time_series_analysis(ctx: &SessionContext) -> Result<()> {
let df = ctx.sql("
SELECT
DATE_TRUNC('hour', timestamp) as hour,
COUNT(*) as events_per_hour,
AVG(value) as avg_value,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95_value
FROM metrics
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY 1
ORDER BY 1
").await?;
df.show().await?;
Ok(())
}
Complex joins:
async fn complex_join(ctx: &SessionContext) -> Result<()> {
let df = ctx.sql("
SELECT
e.event_type,
u.country,
COUNT(*) as count,
AVG(e.duration) as avg_duration
FROM events e
JOIN users u ON e.user_id = u.id
LEFT JOIN subscriptions s ON u.id = s.user_id
WHERE e.timestamp >= '2024-01-01'
AND u.active = true
GROUP BY e.event_type, u.country
HAVING count > 100
ORDER BY count DESC
").await?;
df.show().await?;
Ok(())
}
Best Practices
- Use partition pruning by filtering on partition columns (year, month, day)
- Select only needed columns to leverage projection pushdown
- Configure appropriate parallelism based on CPU cores and data size
- Use EXPLAIN to verify query optimization
- Stream large results instead of collecting all at once
- Register statistics when creating tables for better query planning
- Create views for commonly used queries
- Use UDFs for custom business logic
Troubleshooting
Out of memory:
- Reduce batch size:
.with_batch_size(4096) - Set memory limit:
.with_memory_limit() - Stream results instead of collecting
- Enable spilling to disk with temp_file_path
Slow queries:
- Use EXPLAIN to inspect query plan
- Verify partition pruning is working
- Check if predicates can be pushed down
- Increase parallelism:
.with_target_partitions() - Ensure object store is registered correctly
Schema errors:
- Verify table registration:
ctx.table("name").await?.schema() - Check for schema evolution in Parquet files
- Use explicit schema for CSV files
- Handle NULL values appropriately
Partition not found:
- Verify path format matches Hive partitioning
- Check object store URL registration
- List files to debug:
store.list(prefix).await