15 KiB
15 KiB
description
| description |
|---|
| Create and manage Apache Iceberg tables with ACID transactions and schema evolution |
Apache Iceberg Tables
Help the user work with Apache Iceberg tables for data lakes with ACID transactions, time travel, and schema evolution capabilities.
Steps
-
Add required dependencies:
[dependencies] iceberg = "0.3" iceberg-catalog-rest = "0.3" arrow = "52" parquet = "52" object_store = "0.9" tokio = { version = "1", features = ["full"] } -
Set up Iceberg catalog:
use iceberg::{Catalog, TableIdent}; use iceberg_catalog_rest::RestCatalog; async fn create_catalog() -> Result<RestCatalog> { // REST catalog (works with services like Polaris, Nessie, etc.) let catalog = RestCatalog::new( "http://localhost:8181", // Catalog endpoint "warehouse", // Warehouse location ).await?; Ok(catalog) } // For AWS Glue catalog // use iceberg_catalog_glue::GlueCatalog; // For file-based catalog (development) // use iceberg::catalog::FileCatalog; -
Create an Iceberg table:
use iceberg::{ spec::{Schema, NestedField, PrimitiveType, Type}, NamespaceIdent, TableCreation, }; async fn create_table(catalog: &impl Catalog) -> Result<()> { // Define schema let schema = Schema::builder() .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)), NestedField::required(2, "timestamp", Type::Primitive(PrimitiveType::Timestamp)), NestedField::required(3, "user_id", Type::Primitive(PrimitiveType::String)), NestedField::optional(4, "event_type", Type::Primitive(PrimitiveType::String)), NestedField::optional(5, "properties", Type::Primitive(PrimitiveType::String)), ]) .build()?; // Define partitioning let partition_spec = iceberg::spec::PartitionSpec::builder() .with_spec_id(0) .add_partition_field(2, "year", iceberg::spec::Transform::Year)? // Partition by year .add_partition_field(2, "month", iceberg::spec::Transform::Month)? // Partition by month .build()?; // Define sort order (for data clustering) let sort_order = iceberg::spec::SortOrder::builder() .with_order_id(0) .add_sort_field( iceberg::spec::SortField::builder() .source_id(2) // timestamp field .direction(iceberg::spec::SortDirection::Ascending) .null_order(iceberg::spec::NullOrder::First) .build(), ) .build()?; // Create table let table_creation = TableCreation::builder() .name("events".to_string()) .schema(schema) .partition_spec(partition_spec) .sort_order(sort_order) .build(); let namespace = NamespaceIdent::new("db".to_string()); let table_ident = TableIdent::new(namespace, "events".to_string()); catalog.create_table(&table_ident, table_creation).await?; println!("Table created: db.events"); Ok(()) } -
Load an existing table:
async fn load_table(catalog: &impl Catalog) -> Result<iceberg::Table> { let namespace = NamespaceIdent::new("db".to_string()); let table_ident = TableIdent::new(namespace, "events".to_string()); let table = catalog.load_table(&table_ident).await?; // Inspect table metadata println!("Schema: {:?}", table.metadata().current_schema()); println!("Location: {}", table.metadata().location()); println!("Snapshots: {}", table.metadata().snapshots().len()); Ok(table) } -
Write data to Iceberg table:
use iceberg::writer::{IcebergWriter, RecordBatchWriter}; use arrow::record_batch::RecordBatch; async fn write_data( table: &iceberg::Table, batches: Vec<RecordBatch>, ) -> Result<()> { // Create writer let mut writer = table .writer() .partition_by(table.metadata().default_partition_spec()?) .build() .await?; // Write batches for batch in batches { writer.write(&batch).await?; } // Commit (ACID transaction) let data_files = writer.close().await?; // Create snapshot let mut append = table.new_append(); for file in data_files { append.add_data_file(file)?; } append.commit().await?; println!("Data written and committed"); Ok(()) } -
Read data with time travel:
use iceberg::scan::{TableScan, TableScanBuilder}; async fn read_latest(table: &iceberg::Table) -> Result<Vec<RecordBatch>> { // Read latest snapshot let scan = table.scan().build().await?; let batches = scan.to_arrow().await?; Ok(batches) } async fn read_snapshot( table: &iceberg::Table, snapshot_id: i64, ) -> Result<Vec<RecordBatch>> { // Time travel to specific snapshot let scan = table .scan() .snapshot_id(snapshot_id) .build() .await?; let batches = scan.to_arrow().await?; Ok(batches) } async fn read_as_of_timestamp( table: &iceberg::Table, timestamp_ms: i64, ) -> Result<Vec<RecordBatch>> { // Time travel to specific timestamp let scan = table .scan() .as_of_timestamp(timestamp_ms) .build() .await?; let batches = scan.to_arrow().await?; Ok(batches) } -
Perform schema evolution:
async fn evolve_schema(table: &mut iceberg::Table) -> Result<()> { // Add new column let mut update = table.update_schema(); update .add_column("new_field", Type::Primitive(PrimitiveType::String), true)? .commit() .await?; println!("Added column: new_field"); // Rename column let mut update = table.update_schema(); update .rename_column("old_name", "new_name")? .commit() .await?; println!("Renamed column: old_name -> new_name"); // Delete column (metadata only) let mut update = table.update_schema(); update .delete_column("unused_field")? .commit() .await?; println!("Deleted column: unused_field"); // Update column type (limited support) let mut update = table.update_schema(); update .update_column("numeric_field", Type::Primitive(PrimitiveType::Double))? .commit() .await?; // Reorder columns let mut update = table.update_schema(); update .move_first("important_field")? .move_after("field_a", "field_b")? .commit() .await?; Ok(()) } -
Query history and snapshots:
async fn inspect_history(table: &iceberg::Table) -> Result<()> { let metadata = table.metadata(); // List all snapshots println!("Snapshots:"); for snapshot in metadata.snapshots() { println!( " ID: {}, Timestamp: {}, Summary: {:?}", snapshot.snapshot_id(), snapshot.timestamp_ms(), snapshot.summary() ); } // Get current snapshot if let Some(current) = metadata.current_snapshot() { println!("Current snapshot: {}", current.snapshot_id()); println!("Manifest list: {}", current.manifest_list()); } // Get schema history println!("\nSchema versions:"); for schema in metadata.schemas() { println!(" Schema ID {}: {} fields", schema.schema_id(), schema.fields().len()); } Ok(()) }
Advanced Features
Partition evolution:
async fn evolve_partitioning(table: &mut iceberg::Table) -> Result<()> {
// Change partition strategy without rewriting data
let mut update = table.update_partition_spec();
// Add day partitioning
update.add_field(
"timestamp",
"day",
iceberg::spec::Transform::Day,
)?;
// Remove old month partitioning
update.remove_field("month")?;
update.commit().await?;
println!("Partition spec evolved");
Ok(())
}
Hidden partitioning:
// Iceberg supports hidden partitioning - partition on derived values
// Users don't need to specify partition columns in queries
async fn create_table_with_hidden_partitioning(catalog: &impl Catalog) -> Result<()> {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "timestamp", Type::Primitive(PrimitiveType::Timestamp)),
NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)),
])
.build()?;
// Partition by year(timestamp) and month(timestamp)
// But timestamp is a regular column, not a partition column
let partition_spec = iceberg::spec::PartitionSpec::builder()
.add_partition_field(1, "year", iceberg::spec::Transform::Year)?
.add_partition_field(1, "month", iceberg::spec::Transform::Month)?
.build()?;
// Now queries like:
// SELECT * FROM table WHERE timestamp >= '2024-01-01'
// Will automatically use partition pruning
Ok(())
}
Incremental reads:
async fn incremental_read(
table: &iceberg::Table,
from_snapshot_id: i64,
to_snapshot_id: Option<i64>,
) -> Result<Vec<RecordBatch>> {
// Read only data added between snapshots
let scan = table
.scan()
.from_snapshot_id(from_snapshot_id)
.snapshot_id(to_snapshot_id.unwrap_or_else(|| {
table.metadata().current_snapshot().unwrap().snapshot_id()
}))
.build()
.await?;
let batches = scan.to_arrow().await?;
Ok(batches)
}
Filtering and projection:
use iceberg::expr::{Predicate, Reference};
async fn filtered_scan(table: &iceberg::Table) -> Result<Vec<RecordBatch>> {
// Build predicate
let predicate = Predicate::and(
Predicate::greater_than("timestamp", 1704067200000i64), // > 2024-01-01
Predicate::equal("event_type", "click"),
);
// Scan with predicate pushdown
let scan = table
.scan()
.with_filter(predicate)
.select(&["user_id", "timestamp", "event_type"]) // Column projection
.build()
.await?;
let batches = scan.to_arrow().await?;
Ok(batches)
}
Compaction (optimize files):
async fn compact_table(table: &iceberg::Table) -> Result<()> {
// Read small files
let scan = table.scan().build().await?;
let batches = scan.to_arrow().await?;
// Rewrite as larger, optimized files
let mut writer = table
.writer()
.partition_by(table.metadata().default_partition_spec()?)
.build()
.await?;
for batch in batches {
writer.write(&batch).await?;
}
let new_files = writer.close().await?;
// Atomic replace
let mut rewrite = table.new_rewrite();
rewrite
.delete_files(/* old files */)
.add_files(new_files)
.commit()
.await?;
Ok(())
}
Integration with DataFusion
use datafusion::prelude::*;
use iceberg::datafusion::IcebergTableProvider;
async fn query_with_datafusion(table: iceberg::Table) -> Result<()> {
// Create DataFusion context
let ctx = SessionContext::new();
// Register Iceberg table
let provider = IcebergTableProvider::try_new(table).await?;
ctx.register_table("events", Arc::new(provider))?;
// Query with SQL
let df = ctx.sql("
SELECT
event_type,
COUNT(*) as count
FROM events
WHERE timestamp >= '2024-01-01'
GROUP BY event_type
").await?;
df.show().await?;
Ok(())
}
Common Patterns
Creating a data pipeline:
async fn data_pipeline(
source_store: Arc<dyn ObjectStore>,
table: &iceberg::Table,
) -> Result<()> {
// 1. Read from source (e.g., Parquet)
let batches = read_parquet_files(source_store).await?;
// 2. Transform data
let transformed = transform_batches(batches)?;
// 3. Write to Iceberg table
write_data(table, transformed).await?;
println!("Pipeline complete");
Ok(())
}
Implementing time-based retention:
async fn expire_old_snapshots(table: &mut iceberg::Table, days: i64) -> Result<()> {
let cutoff_ms = chrono::Utc::now().timestamp_millis() - (days * 24 * 60 * 60 * 1000);
let mut expire = table.expire_snapshots();
expire
.expire_older_than(cutoff_ms)
.retain_last(10) // Keep at least 10 snapshots
.commit()
.await?;
println!("Expired snapshots older than {} days", days);
Ok(())
}
Atomic updates:
async fn atomic_update(table: &iceberg::Table) -> Result<()> {
// All or nothing - either entire commit succeeds or fails
let mut transaction = table.new_transaction();
// Multiple operations in one transaction
transaction.append(/* new data */);
transaction.update_schema(/* schema change */);
transaction.update_properties(/* property change */);
// Atomic commit
transaction.commit().await?;
Ok(())
}
Best Practices
- Use hidden partitioning for cleaner queries and easier partition evolution
- Define sort order to cluster related data together
- Expire old snapshots regularly to avoid metadata bloat
- Use schema evolution instead of creating new tables
- Leverage time travel for debugging and auditing
- Compact small files periodically for better read performance
- Use partition evolution to adapt to changing data patterns
- Enable statistics for query optimization
Benefits Over Raw Parquet
- ACID Transactions: Atomic commits prevent partial updates
- Time Travel: Query historical table states
- Schema Evolution: Add/rename/reorder columns safely
- Partition Evolution: Change partitioning without rewriting
- Hidden Partitioning: Cleaner queries, automatic partition pruning
- Concurrency: Multiple writers with optimistic concurrency
- Metadata Management: Efficient metadata operations
- Data Lineage: Track changes over time
Troubleshooting
Metadata file not found:
- Verify catalog configuration
- Check object store permissions
- Ensure table was created successfully
Schema mismatch on write:
- Verify writer schema matches table schema
- Use schema evolution to add new fields
- Check for required vs. optional fields
Slow queries:
- Use predicate pushdown with filters
- Enable column projection
- Compact small files
- Verify partition pruning is working
Snapshot expiration issues:
- Ensure retain_last is set appropriately
- Don't expire too aggressively if time travel is needed
- Clean up orphaned files separately