Files
gh-emillindfors-claude-mark…/commands/data-iceberg-table.md
2025-11-29 18:25:45 +08:00

15 KiB

description
description
Create and manage Apache Iceberg tables with ACID transactions and schema evolution

Apache Iceberg Tables

Help the user work with Apache Iceberg tables for data lakes with ACID transactions, time travel, and schema evolution capabilities.

Steps

  1. Add required dependencies:

    [dependencies]
    iceberg = "0.3"
    iceberg-catalog-rest = "0.3"
    arrow = "52"
    parquet = "52"
    object_store = "0.9"
    tokio = { version = "1", features = ["full"] }
    
  2. Set up Iceberg catalog:

    use iceberg::{Catalog, TableIdent};
    use iceberg_catalog_rest::RestCatalog;
    
    async fn create_catalog() -> Result<RestCatalog> {
        // REST catalog (works with services like Polaris, Nessie, etc.)
        let catalog = RestCatalog::new(
            "http://localhost:8181",  // Catalog endpoint
            "warehouse",               // Warehouse location
        ).await?;
    
        Ok(catalog)
    }
    
    // For AWS Glue catalog
    // use iceberg_catalog_glue::GlueCatalog;
    
    // For file-based catalog (development)
    // use iceberg::catalog::FileCatalog;
    
  3. Create an Iceberg table:

    use iceberg::{
        spec::{Schema, NestedField, PrimitiveType, Type},
        NamespaceIdent, TableCreation,
    };
    
    async fn create_table(catalog: &impl Catalog) -> Result<()> {
        // Define schema
        let schema = Schema::builder()
            .with_fields(vec![
                NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)),
                NestedField::required(2, "timestamp", Type::Primitive(PrimitiveType::Timestamp)),
                NestedField::required(3, "user_id", Type::Primitive(PrimitiveType::String)),
                NestedField::optional(4, "event_type", Type::Primitive(PrimitiveType::String)),
                NestedField::optional(5, "properties", Type::Primitive(PrimitiveType::String)),
            ])
            .build()?;
    
        // Define partitioning
        let partition_spec = iceberg::spec::PartitionSpec::builder()
            .with_spec_id(0)
            .add_partition_field(2, "year", iceberg::spec::Transform::Year)? // Partition by year
            .add_partition_field(2, "month", iceberg::spec::Transform::Month)? // Partition by month
            .build()?;
    
        // Define sort order (for data clustering)
        let sort_order = iceberg::spec::SortOrder::builder()
            .with_order_id(0)
            .add_sort_field(
                iceberg::spec::SortField::builder()
                    .source_id(2) // timestamp field
                    .direction(iceberg::spec::SortDirection::Ascending)
                    .null_order(iceberg::spec::NullOrder::First)
                    .build(),
            )
            .build()?;
    
        // Create table
        let table_creation = TableCreation::builder()
            .name("events".to_string())
            .schema(schema)
            .partition_spec(partition_spec)
            .sort_order(sort_order)
            .build();
    
        let namespace = NamespaceIdent::new("db".to_string());
        let table_ident = TableIdent::new(namespace, "events".to_string());
    
        catalog.create_table(&table_ident, table_creation).await?;
    
        println!("Table created: db.events");
        Ok(())
    }
    
  4. Load an existing table:

    async fn load_table(catalog: &impl Catalog) -> Result<iceberg::Table> {
        let namespace = NamespaceIdent::new("db".to_string());
        let table_ident = TableIdent::new(namespace, "events".to_string());
    
        let table = catalog.load_table(&table_ident).await?;
    
        // Inspect table metadata
        println!("Schema: {:?}", table.metadata().current_schema());
        println!("Location: {}", table.metadata().location());
        println!("Snapshots: {}", table.metadata().snapshots().len());
    
        Ok(table)
    }
    
  5. Write data to Iceberg table:

    use iceberg::writer::{IcebergWriter, RecordBatchWriter};
    use arrow::record_batch::RecordBatch;
    
    async fn write_data(
        table: &iceberg::Table,
        batches: Vec<RecordBatch>,
    ) -> Result<()> {
        // Create writer
        let mut writer = table
            .writer()
            .partition_by(table.metadata().default_partition_spec()?)
            .build()
            .await?;
    
        // Write batches
        for batch in batches {
            writer.write(&batch).await?;
        }
    
        // Commit (ACID transaction)
        let data_files = writer.close().await?;
    
        // Create snapshot
        let mut append = table.new_append();
        for file in data_files {
            append.add_data_file(file)?;
        }
        append.commit().await?;
    
        println!("Data written and committed");
        Ok(())
    }
    
  6. Read data with time travel:

    use iceberg::scan::{TableScan, TableScanBuilder};
    
    async fn read_latest(table: &iceberg::Table) -> Result<Vec<RecordBatch>> {
        // Read latest snapshot
        let scan = table.scan().build().await?;
    
        let batches = scan.to_arrow().await?;
    
        Ok(batches)
    }
    
    async fn read_snapshot(
        table: &iceberg::Table,
        snapshot_id: i64,
    ) -> Result<Vec<RecordBatch>> {
        // Time travel to specific snapshot
        let scan = table
            .scan()
            .snapshot_id(snapshot_id)
            .build()
            .await?;
    
        let batches = scan.to_arrow().await?;
    
        Ok(batches)
    }
    
    async fn read_as_of_timestamp(
        table: &iceberg::Table,
        timestamp_ms: i64,
    ) -> Result<Vec<RecordBatch>> {
        // Time travel to specific timestamp
        let scan = table
            .scan()
            .as_of_timestamp(timestamp_ms)
            .build()
            .await?;
    
        let batches = scan.to_arrow().await?;
    
        Ok(batches)
    }
    
  7. Perform schema evolution:

    async fn evolve_schema(table: &mut iceberg::Table) -> Result<()> {
        // Add new column
        let mut update = table.update_schema();
        update
            .add_column("new_field", Type::Primitive(PrimitiveType::String), true)?
            .commit()
            .await?;
    
        println!("Added column: new_field");
    
        // Rename column
        let mut update = table.update_schema();
        update
            .rename_column("old_name", "new_name")?
            .commit()
            .await?;
    
        println!("Renamed column: old_name -> new_name");
    
        // Delete column (metadata only)
        let mut update = table.update_schema();
        update
            .delete_column("unused_field")?
            .commit()
            .await?;
    
        println!("Deleted column: unused_field");
    
        // Update column type (limited support)
        let mut update = table.update_schema();
        update
            .update_column("numeric_field", Type::Primitive(PrimitiveType::Double))?
            .commit()
            .await?;
    
        // Reorder columns
        let mut update = table.update_schema();
        update
            .move_first("important_field")?
            .move_after("field_a", "field_b")?
            .commit()
            .await?;
    
        Ok(())
    }
    
  8. Query history and snapshots:

    async fn inspect_history(table: &iceberg::Table) -> Result<()> {
        let metadata = table.metadata();
    
        // List all snapshots
        println!("Snapshots:");
        for snapshot in metadata.snapshots() {
            println!(
                "  ID: {}, Timestamp: {}, Summary: {:?}",
                snapshot.snapshot_id(),
                snapshot.timestamp_ms(),
                snapshot.summary()
            );
        }
    
        // Get current snapshot
        if let Some(current) = metadata.current_snapshot() {
            println!("Current snapshot: {}", current.snapshot_id());
            println!("Manifest list: {}", current.manifest_list());
        }
    
        // Get schema history
        println!("\nSchema versions:");
        for schema in metadata.schemas() {
            println!("  Schema ID {}: {} fields", schema.schema_id(), schema.fields().len());
        }
    
        Ok(())
    }
    

Advanced Features

Partition evolution:

async fn evolve_partitioning(table: &mut iceberg::Table) -> Result<()> {
    // Change partition strategy without rewriting data
    let mut update = table.update_partition_spec();

    // Add day partitioning
    update.add_field(
        "timestamp",
        "day",
        iceberg::spec::Transform::Day,
    )?;

    // Remove old month partitioning
    update.remove_field("month")?;

    update.commit().await?;

    println!("Partition spec evolved");
    Ok(())
}

Hidden partitioning:

// Iceberg supports hidden partitioning - partition on derived values
// Users don't need to specify partition columns in queries

async fn create_table_with_hidden_partitioning(catalog: &impl Catalog) -> Result<()> {
    let schema = Schema::builder()
        .with_fields(vec![
            NestedField::required(1, "timestamp", Type::Primitive(PrimitiveType::Timestamp)),
            NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)),
        ])
        .build()?;

    // Partition by year(timestamp) and month(timestamp)
    // But timestamp is a regular column, not a partition column
    let partition_spec = iceberg::spec::PartitionSpec::builder()
        .add_partition_field(1, "year", iceberg::spec::Transform::Year)?
        .add_partition_field(1, "month", iceberg::spec::Transform::Month)?
        .build()?;

    // Now queries like:
    // SELECT * FROM table WHERE timestamp >= '2024-01-01'
    // Will automatically use partition pruning

    Ok(())
}

Incremental reads:

async fn incremental_read(
    table: &iceberg::Table,
    from_snapshot_id: i64,
    to_snapshot_id: Option<i64>,
) -> Result<Vec<RecordBatch>> {
    // Read only data added between snapshots
    let scan = table
        .scan()
        .from_snapshot_id(from_snapshot_id)
        .snapshot_id(to_snapshot_id.unwrap_or_else(|| {
            table.metadata().current_snapshot().unwrap().snapshot_id()
        }))
        .build()
        .await?;

    let batches = scan.to_arrow().await?;

    Ok(batches)
}

Filtering and projection:

use iceberg::expr::{Predicate, Reference};

async fn filtered_scan(table: &iceberg::Table) -> Result<Vec<RecordBatch>> {
    // Build predicate
    let predicate = Predicate::and(
        Predicate::greater_than("timestamp", 1704067200000i64), // > 2024-01-01
        Predicate::equal("event_type", "click"),
    );

    // Scan with predicate pushdown
    let scan = table
        .scan()
        .with_filter(predicate)
        .select(&["user_id", "timestamp", "event_type"]) // Column projection
        .build()
        .await?;

    let batches = scan.to_arrow().await?;

    Ok(batches)
}

Compaction (optimize files):

async fn compact_table(table: &iceberg::Table) -> Result<()> {
    // Read small files
    let scan = table.scan().build().await?;
    let batches = scan.to_arrow().await?;

    // Rewrite as larger, optimized files
    let mut writer = table
        .writer()
        .partition_by(table.metadata().default_partition_spec()?)
        .build()
        .await?;

    for batch in batches {
        writer.write(&batch).await?;
    }

    let new_files = writer.close().await?;

    // Atomic replace
    let mut rewrite = table.new_rewrite();
    rewrite
        .delete_files(/* old files */)
        .add_files(new_files)
        .commit()
        .await?;

    Ok(())
}

Integration with DataFusion

use datafusion::prelude::*;
use iceberg::datafusion::IcebergTableProvider;

async fn query_with_datafusion(table: iceberg::Table) -> Result<()> {
    // Create DataFusion context
    let ctx = SessionContext::new();

    // Register Iceberg table
    let provider = IcebergTableProvider::try_new(table).await?;
    ctx.register_table("events", Arc::new(provider))?;

    // Query with SQL
    let df = ctx.sql("
        SELECT
            event_type,
            COUNT(*) as count
        FROM events
        WHERE timestamp >= '2024-01-01'
        GROUP BY event_type
    ").await?;

    df.show().await?;

    Ok(())
}

Common Patterns

Creating a data pipeline:

async fn data_pipeline(
    source_store: Arc<dyn ObjectStore>,
    table: &iceberg::Table,
) -> Result<()> {
    // 1. Read from source (e.g., Parquet)
    let batches = read_parquet_files(source_store).await?;

    // 2. Transform data
    let transformed = transform_batches(batches)?;

    // 3. Write to Iceberg table
    write_data(table, transformed).await?;

    println!("Pipeline complete");
    Ok(())
}

Implementing time-based retention:

async fn expire_old_snapshots(table: &mut iceberg::Table, days: i64) -> Result<()> {
    let cutoff_ms = chrono::Utc::now().timestamp_millis() - (days * 24 * 60 * 60 * 1000);

    let mut expire = table.expire_snapshots();
    expire
        .expire_older_than(cutoff_ms)
        .retain_last(10) // Keep at least 10 snapshots
        .commit()
        .await?;

    println!("Expired snapshots older than {} days", days);
    Ok(())
}

Atomic updates:

async fn atomic_update(table: &iceberg::Table) -> Result<()> {
    // All or nothing - either entire commit succeeds or fails
    let mut transaction = table.new_transaction();

    // Multiple operations in one transaction
    transaction.append(/* new data */);
    transaction.update_schema(/* schema change */);
    transaction.update_properties(/* property change */);

    // Atomic commit
    transaction.commit().await?;

    Ok(())
}

Best Practices

  • Use hidden partitioning for cleaner queries and easier partition evolution
  • Define sort order to cluster related data together
  • Expire old snapshots regularly to avoid metadata bloat
  • Use schema evolution instead of creating new tables
  • Leverage time travel for debugging and auditing
  • Compact small files periodically for better read performance
  • Use partition evolution to adapt to changing data patterns
  • Enable statistics for query optimization

Benefits Over Raw Parquet

  1. ACID Transactions: Atomic commits prevent partial updates
  2. Time Travel: Query historical table states
  3. Schema Evolution: Add/rename/reorder columns safely
  4. Partition Evolution: Change partitioning without rewriting
  5. Hidden Partitioning: Cleaner queries, automatic partition pruning
  6. Concurrency: Multiple writers with optimistic concurrency
  7. Metadata Management: Efficient metadata operations
  8. Data Lineage: Track changes over time

Troubleshooting

Metadata file not found:

  • Verify catalog configuration
  • Check object store permissions
  • Ensure table was created successfully

Schema mismatch on write:

  • Verify writer schema matches table schema
  • Use schema evolution to add new fields
  • Check for required vs. optional fields

Slow queries:

  • Use predicate pushdown with filters
  • Enable column projection
  • Compact small files
  • Verify partition pruning is working

Snapshot expiration issues:

  • Ensure retain_last is set appropriately
  • Don't expire too aggressively if time travel is needed
  • Clean up orphaned files separately

Resources