Files
gh-emillindfors-claude-mark…/commands/data-datafusion-query.md
2025-11-29 18:25:45 +08:00

14 KiB

description
description
Execute SQL queries with DataFusion against Parquet, CSV, and in-memory data

DataFusion Query Execution

Help the user set up DataFusion and execute SQL queries against data stored in object storage (Parquet, CSV) or in-memory.

Steps

  1. Add required dependencies:

    [dependencies]
    datafusion = "39"
    arrow = "52"
    object_store = "0.9"
    tokio = { version = "1", features = ["full"] }
    
  2. Create a DataFusion session context:

    use datafusion::prelude::*;
    use datafusion::execution::context::{SessionContext, SessionConfig};
    use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeConfig};
    use std::sync::Arc;
    
    async fn create_context() -> Result<SessionContext> {
        // Configure session
        let config = SessionConfig::new()
            .with_target_partitions(num_cpus::get()) // Match CPU count
            .with_batch_size(8192); // Rows per batch
    
        // Configure runtime
        let runtime_config = RuntimeConfig::new()
            .with_memory_limit(4 * 1024 * 1024 * 1024) // 4GB memory limit
            .with_temp_file_path("/tmp/datafusion");
    
        let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
    
        Ok(SessionContext::new_with_config_rt(config, runtime))
    }
    
  3. Register object store for S3/Azure/GCS:

    use object_store::aws::AmazonS3Builder;
    
    async fn register_object_store(ctx: &SessionContext) -> Result<()> {
        // Create S3 store
        let s3 = AmazonS3Builder::from_env()
            .with_bucket_name("my-data-lake")
            .build()?;
    
        // Register with DataFusion
        let url = "s3://my-data-lake/";
        ctx.runtime_env().register_object_store(
            &url::Url::parse(url)?,
            Arc::new(s3),
        );
    
        Ok(())
    }
    
  4. Register Parquet tables:

    use datafusion::datasource::listing::{
        ListingOptions,
        ListingTable,
        ListingTableConfig,
        ListingTableUrl,
    };
    use datafusion::datasource::file_format::parquet::ParquetFormat;
    
    async fn register_parquet_table(
        ctx: &SessionContext,
        table_name: &str,
        path: &str,
    ) -> Result<()> {
        // Simple registration
        ctx.register_parquet(
            table_name,
            path,
            ParquetReadOptions::default(),
        ).await?;
    
        Ok(())
    }
    
    // Advanced registration with partitioning
    async fn register_partitioned_table(
        ctx: &SessionContext,
        table_name: &str,
        path: &str,
    ) -> Result<()> {
        let table_path = ListingTableUrl::parse(path)?;
    
        let file_format = ParquetFormat::default();
    
        let listing_options = ListingOptions::new(Arc::new(file_format))
            .with_file_extension(".parquet")
            .with_target_partitions(ctx.state().config().target_partitions())
            .with_collect_stat(true); // Collect file statistics
    
        let config = ListingTableConfig::new(table_path)
            .with_listing_options(listing_options);
    
        let table = ListingTable::try_new(config)?;
    
        ctx.register_table(table_name, Arc::new(table))?;
    
        Ok(())
    }
    
  5. Execute SQL queries:

    async fn execute_sql(ctx: &SessionContext, query: &str) -> Result<Vec<RecordBatch>> {
        // Create DataFrame from SQL
        let df = ctx.sql(query).await?;
    
        // Collect all results
        let batches = df.collect().await?;
    
        Ok(batches)
    }
    
    // Example queries
    async fn example_queries(ctx: &SessionContext) -> Result<()> {
        // Simple select
        let df = ctx.sql("
            SELECT user_id, event_type, COUNT(*) as count
            FROM events
            WHERE date >= '2024-01-01'
            GROUP BY user_id, event_type
            ORDER BY count DESC
            LIMIT 100
        ").await?;
    
        df.show().await?;
    
        // Window functions
        let df = ctx.sql("
            SELECT
                user_id,
                timestamp,
                amount,
                SUM(amount) OVER (
                    PARTITION BY user_id
                    ORDER BY timestamp
                    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
                ) as running_total
            FROM transactions
        ").await?;
    
        df.show().await?;
    
        // Joins
        let df = ctx.sql("
            SELECT
                e.user_id,
                u.name,
                COUNT(*) as event_count
            FROM events e
            JOIN users u ON e.user_id = u.id
            GROUP BY e.user_id, u.name
        ").await?;
    
        df.show().await?;
    
        Ok(())
    }
    
  6. Use DataFrame API as an alternative to SQL:

    use datafusion::prelude::*;
    
    async fn dataframe_api_examples(ctx: &SessionContext) -> Result<()> {
        // Get table
        let df = ctx.table("events").await?;
    
        // Filter
        let df = df.filter(col("timestamp").gt(lit("2024-01-01")))?;
    
        // Select columns
        let df = df.select(vec![
            col("user_id"),
            col("event_type"),
            col("timestamp"),
        ])?;
    
        // Aggregate
        let df = df.aggregate(
            vec![col("user_id"), col("event_type")],
            vec![
                count(col("*")).alias("count"),
                avg(col("duration")).alias("avg_duration"),
                max(col("timestamp")).alias("max_time"),
            ],
        )?;
    
        // Sort
        let df = df.sort(vec![
            col("count").sort(false, true), // DESC NULLS LAST
        ])?;
    
        // Limit
        let df = df.limit(0, Some(100))?;
    
        // Execute
        let batches = df.collect().await?;
    
        Ok(())
    }
    
  7. Stream results for large queries:

    use futures::stream::StreamExt;
    
    async fn stream_query_results(
        ctx: &SessionContext,
        query: &str,
    ) -> Result<()> {
        let df = ctx.sql(query).await?;
    
        // Get streaming results
        let mut stream = df.execute_stream().await?;
    
        // Process batches incrementally
        let mut total_rows = 0;
        while let Some(batch) = stream.next().await {
            let batch = batch?;
            total_rows += batch.num_rows();
    
            // Process this batch
            process_batch(&batch)?;
    
            println!("Processed {} rows so far...", total_rows);
        }
    
        println!("Total rows: {}", total_rows);
        Ok(())
    }
    
    fn process_batch(batch: &RecordBatch) -> Result<()> {
        // Your processing logic
        Ok(())
    }
    
  8. Inspect query plans for optimization:

    async fn explain_query(ctx: &SessionContext, query: &str) -> Result<()> {
        // Logical plan
        let logical_plan = ctx.sql(query).await?.into_optimized_plan()?;
        println!("Logical Plan:\n{}", logical_plan.display_indent());
    
        // Physical plan
        let df = ctx.sql(query).await?;
        let physical_plan = df.create_physical_plan().await?;
        println!("Physical Plan:\n{}", physical_plan.display_indent());
    
        // Or use EXPLAIN in SQL
        let df = ctx.sql(&format!("EXPLAIN {}", query)).await?;
        df.show().await?;
    
        Ok(())
    }
    

Advanced Features

Register CSV tables:

use datafusion::datasource::file_format::csv::CsvFormat;

async fn register_csv(ctx: &SessionContext) -> Result<()> {
    ctx.register_csv(
        "users",
        "s3://my-bucket/users.csv",
        CsvReadOptions::new()
            .has_header(true)
            .delimiter(b',')
            .schema_infer_max_records(1000),
    ).await?;

    Ok(())
}

Register in-memory tables:

use datafusion::datasource::MemTable;

async fn register_memory_table(
    ctx: &SessionContext,
    name: &str,
    batches: Vec<RecordBatch>,
    schema: SchemaRef,
) -> Result<()> {
    let mem_table = MemTable::try_new(schema, vec![batches])?;
    ctx.register_table(name, Arc::new(mem_table))?;
    Ok(())
}

Create temporary views:

async fn create_view(ctx: &SessionContext) -> Result<()> {
    // Create view from query
    let df = ctx.sql("
        SELECT user_id, COUNT(*) as count
        FROM events
        GROUP BY user_id
    ").await?;

    ctx.register_table("user_counts", df.into_view())?;

    // Now query the view
    let results = ctx.sql("SELECT * FROM user_counts WHERE count > 100").await?;
    results.show().await?;

    Ok(())
}

User-Defined Functions (UDFs):

use datafusion::logical_expr::{create_udf, Volatility, ColumnarValue};
use arrow::array::StringArray;

async fn register_udfs(ctx: &SessionContext) -> Result<()> {
    // Create scalar UDF
    let extract_domain = create_udf(
        "extract_domain",
        vec![DataType::Utf8],
        Arc::new(DataType::Utf8),
        Volatility::Immutable,
        Arc::new(|args: &[ColumnarValue]| {
            let urls = args[0].clone().into_array(1)?;
            let urls = urls.as_any().downcast_ref::<StringArray>().unwrap();

            let domains: StringArray = urls
                .iter()
                .map(|url| {
                    url.and_then(|u| url::Url::parse(u).ok())
                        .and_then(|u| u.host_str().map(|s| s.to_string()))
                })
                .collect();

            Ok(ColumnarValue::Array(Arc::new(domains)))
        }),
    );

    ctx.register_udf(extract_domain);

    // Use in query
    let df = ctx.sql("
        SELECT
            extract_domain(url) as domain,
            COUNT(*) as count
        FROM events
        GROUP BY domain
    ").await?;

    df.show().await?;

    Ok(())
}

Write query results to Parquet:

async fn write_query_results(
    ctx: &SessionContext,
    query: &str,
    output_path: &str,
) -> Result<()> {
    let df = ctx.sql(query).await?;

    // Write to Parquet
    df.write_parquet(
        output_path,
        DataFrameWriteOptions::new(),
        Some(WriterProperties::builder()
            .set_compression(Compression::ZSTD(ZstdLevel::try_new(3)?))
            .build()),
    ).await?;

    Ok(())
}

Performance Optimization

Partition pruning:

// DataFusion automatically prunes partitions based on WHERE clauses
async fn partition_pruning_example(ctx: &SessionContext) -> Result<()> {
    // Assuming Hive-style partitioning: year=2024/month=01/...

    // This query only scans year=2024/month=01 partitions
    let df = ctx.sql("
        SELECT * FROM events
        WHERE year = 2024 AND month = 1
    ").await?;

    // Use EXPLAIN to verify partition pruning
    let explain = ctx.sql("EXPLAIN SELECT * FROM events WHERE year = 2024 AND month = 1").await?;
    explain.show().await?;

    Ok(())
}

Predicate pushdown:

// DataFusion pushes predicates to Parquet readers automatically
// This reads only relevant row groups based on statistics

let df = ctx.sql("
    SELECT * FROM events
    WHERE user_id = 'user123'
      AND timestamp >= '2024-01-01'
").await?;

Projection pushdown:

// Only requested columns are read from Parquet
let df = ctx.sql("
    SELECT user_id, timestamp
    FROM events
").await?; // Only reads user_id and timestamp columns

Parallelism tuning:

let config = SessionConfig::new()
    .with_target_partitions(16); // Increase for better parallelism

let ctx = SessionContext::new_with_config(config);

Common Patterns

Aggregating across partitions:

async fn aggregate_partitions(ctx: &SessionContext) -> Result<()> {
    let df = ctx.sql("
        SELECT
            year,
            month,
            COUNT(*) as total_events,
            COUNT(DISTINCT user_id) as unique_users,
            AVG(duration) as avg_duration
        FROM events
        WHERE year = 2024
        GROUP BY year, month
        ORDER BY month
    ").await?;

    df.show().await?;
    Ok(())
}

Time-series analysis:

async fn time_series_analysis(ctx: &SessionContext) -> Result<()> {
    let df = ctx.sql("
        SELECT
            DATE_TRUNC('hour', timestamp) as hour,
            COUNT(*) as events_per_hour,
            AVG(value) as avg_value,
            PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95_value
        FROM metrics
        WHERE timestamp >= NOW() - INTERVAL '7 days'
        GROUP BY 1
        ORDER BY 1
    ").await?;

    df.show().await?;
    Ok(())
}

Complex joins:

async fn complex_join(ctx: &SessionContext) -> Result<()> {
    let df = ctx.sql("
        SELECT
            e.event_type,
            u.country,
            COUNT(*) as count,
            AVG(e.duration) as avg_duration
        FROM events e
        JOIN users u ON e.user_id = u.id
        LEFT JOIN subscriptions s ON u.id = s.user_id
        WHERE e.timestamp >= '2024-01-01'
          AND u.active = true
        GROUP BY e.event_type, u.country
        HAVING count > 100
        ORDER BY count DESC
    ").await?;

    df.show().await?;
    Ok(())
}

Best Practices

  • Use partition pruning by filtering on partition columns (year, month, day)
  • Select only needed columns to leverage projection pushdown
  • Configure appropriate parallelism based on CPU cores and data size
  • Use EXPLAIN to verify query optimization
  • Stream large results instead of collecting all at once
  • Register statistics when creating tables for better query planning
  • Create views for commonly used queries
  • Use UDFs for custom business logic

Troubleshooting

Out of memory:

  • Reduce batch size: .with_batch_size(4096)
  • Set memory limit: .with_memory_limit()
  • Stream results instead of collecting
  • Enable spilling to disk with temp_file_path

Slow queries:

  • Use EXPLAIN to inspect query plan
  • Verify partition pruning is working
  • Check if predicates can be pushed down
  • Increase parallelism: .with_target_partitions()
  • Ensure object store is registered correctly

Schema errors:

  • Verify table registration: ctx.table("name").await?.schema()
  • Check for schema evolution in Parquet files
  • Use explicit schema for CSV files
  • Handle NULL values appropriately

Partition not found:

  • Verify path format matches Hive partitioning
  • Check object store URL registration
  • List files to debug: store.list(prefix).await