Initial commit

This commit is contained in:
Zhongwei Li
2025-11-29 18:25:45 +08:00
commit 919e6673e7
13 changed files with 4381 additions and 0 deletions

View File

@@ -0,0 +1,549 @@
---
description: Execute SQL queries with DataFusion against Parquet, CSV, and in-memory data
---
# DataFusion Query Execution
Help the user set up DataFusion and execute SQL queries against data stored in object storage (Parquet, CSV) or in-memory.
## Steps
1. **Add required dependencies**:
```toml
[dependencies]
datafusion = "39"
arrow = "52"
object_store = "0.9"
tokio = { version = "1", features = ["full"] }
```
2. **Create a DataFusion session context**:
```rust
use datafusion::prelude::*;
use datafusion::execution::context::{SessionContext, SessionConfig};
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeConfig};
use std::sync::Arc;
async fn create_context() -> Result<SessionContext> {
// Configure session
let config = SessionConfig::new()
.with_target_partitions(num_cpus::get()) // Match CPU count
.with_batch_size(8192); // Rows per batch
// Configure runtime
let runtime_config = RuntimeConfig::new()
.with_memory_limit(4 * 1024 * 1024 * 1024) // 4GB memory limit
.with_temp_file_path("/tmp/datafusion");
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
Ok(SessionContext::new_with_config_rt(config, runtime))
}
```
3. **Register object store** for S3/Azure/GCS:
```rust
use object_store::aws::AmazonS3Builder;
async fn register_object_store(ctx: &SessionContext) -> Result<()> {
// Create S3 store
let s3 = AmazonS3Builder::from_env()
.with_bucket_name("my-data-lake")
.build()?;
// Register with DataFusion
let url = "s3://my-data-lake/";
ctx.runtime_env().register_object_store(
&url::Url::parse(url)?,
Arc::new(s3),
);
Ok(())
}
```
4. **Register Parquet tables**:
```rust
use datafusion::datasource::listing::{
ListingOptions,
ListingTable,
ListingTableConfig,
ListingTableUrl,
};
use datafusion::datasource::file_format::parquet::ParquetFormat;
async fn register_parquet_table(
ctx: &SessionContext,
table_name: &str,
path: &str,
) -> Result<()> {
// Simple registration
ctx.register_parquet(
table_name,
path,
ParquetReadOptions::default(),
).await?;
Ok(())
}
// Advanced registration with partitioning
async fn register_partitioned_table(
ctx: &SessionContext,
table_name: &str,
path: &str,
) -> Result<()> {
let table_path = ListingTableUrl::parse(path)?;
let file_format = ParquetFormat::default();
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(".parquet")
.with_target_partitions(ctx.state().config().target_partitions())
.with_collect_stat(true); // Collect file statistics
let config = ListingTableConfig::new(table_path)
.with_listing_options(listing_options);
let table = ListingTable::try_new(config)?;
ctx.register_table(table_name, Arc::new(table))?;
Ok(())
}
```
5. **Execute SQL queries**:
```rust
async fn execute_sql(ctx: &SessionContext, query: &str) -> Result<Vec<RecordBatch>> {
// Create DataFrame from SQL
let df = ctx.sql(query).await?;
// Collect all results
let batches = df.collect().await?;
Ok(batches)
}
// Example queries
async fn example_queries(ctx: &SessionContext) -> Result<()> {
// Simple select
let df = ctx.sql("
SELECT user_id, event_type, COUNT(*) as count
FROM events
WHERE date >= '2024-01-01'
GROUP BY user_id, event_type
ORDER BY count DESC
LIMIT 100
").await?;
df.show().await?;
// Window functions
let df = ctx.sql("
SELECT
user_id,
timestamp,
amount,
SUM(amount) OVER (
PARTITION BY user_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total
FROM transactions
").await?;
df.show().await?;
// Joins
let df = ctx.sql("
SELECT
e.user_id,
u.name,
COUNT(*) as event_count
FROM events e
JOIN users u ON e.user_id = u.id
GROUP BY e.user_id, u.name
").await?;
df.show().await?;
Ok(())
}
```
6. **Use DataFrame API** as an alternative to SQL:
```rust
use datafusion::prelude::*;
async fn dataframe_api_examples(ctx: &SessionContext) -> Result<()> {
// Get table
let df = ctx.table("events").await?;
// Filter
let df = df.filter(col("timestamp").gt(lit("2024-01-01")))?;
// Select columns
let df = df.select(vec![
col("user_id"),
col("event_type"),
col("timestamp"),
])?;
// Aggregate
let df = df.aggregate(
vec![col("user_id"), col("event_type")],
vec![
count(col("*")).alias("count"),
avg(col("duration")).alias("avg_duration"),
max(col("timestamp")).alias("max_time"),
],
)?;
// Sort
let df = df.sort(vec![
col("count").sort(false, true), // DESC NULLS LAST
])?;
// Limit
let df = df.limit(0, Some(100))?;
// Execute
let batches = df.collect().await?;
Ok(())
}
```
7. **Stream results** for large queries:
```rust
use futures::stream::StreamExt;
async fn stream_query_results(
ctx: &SessionContext,
query: &str,
) -> Result<()> {
let df = ctx.sql(query).await?;
// Get streaming results
let mut stream = df.execute_stream().await?;
// Process batches incrementally
let mut total_rows = 0;
while let Some(batch) = stream.next().await {
let batch = batch?;
total_rows += batch.num_rows();
// Process this batch
process_batch(&batch)?;
println!("Processed {} rows so far...", total_rows);
}
println!("Total rows: {}", total_rows);
Ok(())
}
fn process_batch(batch: &RecordBatch) -> Result<()> {
// Your processing logic
Ok(())
}
```
8. **Inspect query plans** for optimization:
```rust
async fn explain_query(ctx: &SessionContext, query: &str) -> Result<()> {
// Logical plan
let logical_plan = ctx.sql(query).await?.into_optimized_plan()?;
println!("Logical Plan:\n{}", logical_plan.display_indent());
// Physical plan
let df = ctx.sql(query).await?;
let physical_plan = df.create_physical_plan().await?;
println!("Physical Plan:\n{}", physical_plan.display_indent());
// Or use EXPLAIN in SQL
let df = ctx.sql(&format!("EXPLAIN {}", query)).await?;
df.show().await?;
Ok(())
}
```
## Advanced Features
**Register CSV tables**:
```rust
use datafusion::datasource::file_format::csv::CsvFormat;
async fn register_csv(ctx: &SessionContext) -> Result<()> {
ctx.register_csv(
"users",
"s3://my-bucket/users.csv",
CsvReadOptions::new()
.has_header(true)
.delimiter(b',')
.schema_infer_max_records(1000),
).await?;
Ok(())
}
```
**Register in-memory tables**:
```rust
use datafusion::datasource::MemTable;
async fn register_memory_table(
ctx: &SessionContext,
name: &str,
batches: Vec<RecordBatch>,
schema: SchemaRef,
) -> Result<()> {
let mem_table = MemTable::try_new(schema, vec![batches])?;
ctx.register_table(name, Arc::new(mem_table))?;
Ok(())
}
```
**Create temporary views**:
```rust
async fn create_view(ctx: &SessionContext) -> Result<()> {
// Create view from query
let df = ctx.sql("
SELECT user_id, COUNT(*) as count
FROM events
GROUP BY user_id
").await?;
ctx.register_table("user_counts", df.into_view())?;
// Now query the view
let results = ctx.sql("SELECT * FROM user_counts WHERE count > 100").await?;
results.show().await?;
Ok(())
}
```
**User-Defined Functions (UDFs)**:
```rust
use datafusion::logical_expr::{create_udf, Volatility, ColumnarValue};
use arrow::array::StringArray;
async fn register_udfs(ctx: &SessionContext) -> Result<()> {
// Create scalar UDF
let extract_domain = create_udf(
"extract_domain",
vec![DataType::Utf8],
Arc::new(DataType::Utf8),
Volatility::Immutable,
Arc::new(|args: &[ColumnarValue]| {
let urls = args[0].clone().into_array(1)?;
let urls = urls.as_any().downcast_ref::<StringArray>().unwrap();
let domains: StringArray = urls
.iter()
.map(|url| {
url.and_then(|u| url::Url::parse(u).ok())
.and_then(|u| u.host_str().map(|s| s.to_string()))
})
.collect();
Ok(ColumnarValue::Array(Arc::new(domains)))
}),
);
ctx.register_udf(extract_domain);
// Use in query
let df = ctx.sql("
SELECT
extract_domain(url) as domain,
COUNT(*) as count
FROM events
GROUP BY domain
").await?;
df.show().await?;
Ok(())
}
```
**Write query results to Parquet**:
```rust
async fn write_query_results(
ctx: &SessionContext,
query: &str,
output_path: &str,
) -> Result<()> {
let df = ctx.sql(query).await?;
// Write to Parquet
df.write_parquet(
output_path,
DataFrameWriteOptions::new(),
Some(WriterProperties::builder()
.set_compression(Compression::ZSTD(ZstdLevel::try_new(3)?))
.build()),
).await?;
Ok(())
}
```
## Performance Optimization
**Partition pruning**:
```rust
// DataFusion automatically prunes partitions based on WHERE clauses
async fn partition_pruning_example(ctx: &SessionContext) -> Result<()> {
// Assuming Hive-style partitioning: year=2024/month=01/...
// This query only scans year=2024/month=01 partitions
let df = ctx.sql("
SELECT * FROM events
WHERE year = 2024 AND month = 1
").await?;
// Use EXPLAIN to verify partition pruning
let explain = ctx.sql("EXPLAIN SELECT * FROM events WHERE year = 2024 AND month = 1").await?;
explain.show().await?;
Ok(())
}
```
**Predicate pushdown**:
```rust
// DataFusion pushes predicates to Parquet readers automatically
// This reads only relevant row groups based on statistics
let df = ctx.sql("
SELECT * FROM events
WHERE user_id = 'user123'
AND timestamp >= '2024-01-01'
").await?;
```
**Projection pushdown**:
```rust
// Only requested columns are read from Parquet
let df = ctx.sql("
SELECT user_id, timestamp
FROM events
").await?; // Only reads user_id and timestamp columns
```
**Parallelism tuning**:
```rust
let config = SessionConfig::new()
.with_target_partitions(16); // Increase for better parallelism
let ctx = SessionContext::new_with_config(config);
```
## Common Patterns
**Aggregating across partitions**:
```rust
async fn aggregate_partitions(ctx: &SessionContext) -> Result<()> {
let df = ctx.sql("
SELECT
year,
month,
COUNT(*) as total_events,
COUNT(DISTINCT user_id) as unique_users,
AVG(duration) as avg_duration
FROM events
WHERE year = 2024
GROUP BY year, month
ORDER BY month
").await?;
df.show().await?;
Ok(())
}
```
**Time-series analysis**:
```rust
async fn time_series_analysis(ctx: &SessionContext) -> Result<()> {
let df = ctx.sql("
SELECT
DATE_TRUNC('hour', timestamp) as hour,
COUNT(*) as events_per_hour,
AVG(value) as avg_value,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95_value
FROM metrics
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY 1
ORDER BY 1
").await?;
df.show().await?;
Ok(())
}
```
**Complex joins**:
```rust
async fn complex_join(ctx: &SessionContext) -> Result<()> {
let df = ctx.sql("
SELECT
e.event_type,
u.country,
COUNT(*) as count,
AVG(e.duration) as avg_duration
FROM events e
JOIN users u ON e.user_id = u.id
LEFT JOIN subscriptions s ON u.id = s.user_id
WHERE e.timestamp >= '2024-01-01'
AND u.active = true
GROUP BY e.event_type, u.country
HAVING count > 100
ORDER BY count DESC
").await?;
df.show().await?;
Ok(())
}
```
## Best Practices
- **Use partition pruning** by filtering on partition columns (year, month, day)
- **Select only needed columns** to leverage projection pushdown
- **Configure appropriate parallelism** based on CPU cores and data size
- **Use EXPLAIN** to verify query optimization
- **Stream large results** instead of collecting all at once
- **Register statistics** when creating tables for better query planning
- **Create views** for commonly used queries
- **Use UDFs** for custom business logic
## Troubleshooting
**Out of memory**:
- Reduce batch size: `.with_batch_size(4096)`
- Set memory limit: `.with_memory_limit()`
- Stream results instead of collecting
- Enable spilling to disk with temp_file_path
**Slow queries**:
- Use EXPLAIN to inspect query plan
- Verify partition pruning is working
- Check if predicates can be pushed down
- Increase parallelism: `.with_target_partitions()`
- Ensure object store is registered correctly
**Schema errors**:
- Verify table registration: `ctx.table("name").await?.schema()`
- Check for schema evolution in Parquet files
- Use explicit schema for CSV files
- Handle NULL values appropriately
**Partition not found**:
- Verify path format matches Hive partitioning
- Check object store URL registration
- List files to debug: `store.list(prefix).await`

View File

@@ -0,0 +1,549 @@
---
description: Create and manage Apache Iceberg tables with ACID transactions and schema evolution
---
# Apache Iceberg Tables
Help the user work with Apache Iceberg tables for data lakes with ACID transactions, time travel, and schema evolution capabilities.
## Steps
1. **Add required dependencies**:
```toml
[dependencies]
iceberg = "0.3"
iceberg-catalog-rest = "0.3"
arrow = "52"
parquet = "52"
object_store = "0.9"
tokio = { version = "1", features = ["full"] }
```
2. **Set up Iceberg catalog**:
```rust
use iceberg::{Catalog, TableIdent};
use iceberg_catalog_rest::RestCatalog;
async fn create_catalog() -> Result<RestCatalog> {
// REST catalog (works with services like Polaris, Nessie, etc.)
let catalog = RestCatalog::new(
"http://localhost:8181", // Catalog endpoint
"warehouse", // Warehouse location
).await?;
Ok(catalog)
}
// For AWS Glue catalog
// use iceberg_catalog_glue::GlueCatalog;
// For file-based catalog (development)
// use iceberg::catalog::FileCatalog;
```
3. **Create an Iceberg table**:
```rust
use iceberg::{
spec::{Schema, NestedField, PrimitiveType, Type},
NamespaceIdent, TableCreation,
};
async fn create_table(catalog: &impl Catalog) -> Result<()> {
// Define schema
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)),
NestedField::required(2, "timestamp", Type::Primitive(PrimitiveType::Timestamp)),
NestedField::required(3, "user_id", Type::Primitive(PrimitiveType::String)),
NestedField::optional(4, "event_type", Type::Primitive(PrimitiveType::String)),
NestedField::optional(5, "properties", Type::Primitive(PrimitiveType::String)),
])
.build()?;
// Define partitioning
let partition_spec = iceberg::spec::PartitionSpec::builder()
.with_spec_id(0)
.add_partition_field(2, "year", iceberg::spec::Transform::Year)? // Partition by year
.add_partition_field(2, "month", iceberg::spec::Transform::Month)? // Partition by month
.build()?;
// Define sort order (for data clustering)
let sort_order = iceberg::spec::SortOrder::builder()
.with_order_id(0)
.add_sort_field(
iceberg::spec::SortField::builder()
.source_id(2) // timestamp field
.direction(iceberg::spec::SortDirection::Ascending)
.null_order(iceberg::spec::NullOrder::First)
.build(),
)
.build()?;
// Create table
let table_creation = TableCreation::builder()
.name("events".to_string())
.schema(schema)
.partition_spec(partition_spec)
.sort_order(sort_order)
.build();
let namespace = NamespaceIdent::new("db".to_string());
let table_ident = TableIdent::new(namespace, "events".to_string());
catalog.create_table(&table_ident, table_creation).await?;
println!("Table created: db.events");
Ok(())
}
```
4. **Load an existing table**:
```rust
async fn load_table(catalog: &impl Catalog) -> Result<iceberg::Table> {
let namespace = NamespaceIdent::new("db".to_string());
let table_ident = TableIdent::new(namespace, "events".to_string());
let table = catalog.load_table(&table_ident).await?;
// Inspect table metadata
println!("Schema: {:?}", table.metadata().current_schema());
println!("Location: {}", table.metadata().location());
println!("Snapshots: {}", table.metadata().snapshots().len());
Ok(table)
}
```
5. **Write data to Iceberg table**:
```rust
use iceberg::writer::{IcebergWriter, RecordBatchWriter};
use arrow::record_batch::RecordBatch;
async fn write_data(
table: &iceberg::Table,
batches: Vec<RecordBatch>,
) -> Result<()> {
// Create writer
let mut writer = table
.writer()
.partition_by(table.metadata().default_partition_spec()?)
.build()
.await?;
// Write batches
for batch in batches {
writer.write(&batch).await?;
}
// Commit (ACID transaction)
let data_files = writer.close().await?;
// Create snapshot
let mut append = table.new_append();
for file in data_files {
append.add_data_file(file)?;
}
append.commit().await?;
println!("Data written and committed");
Ok(())
}
```
6. **Read data with time travel**:
```rust
use iceberg::scan::{TableScan, TableScanBuilder};
async fn read_latest(table: &iceberg::Table) -> Result<Vec<RecordBatch>> {
// Read latest snapshot
let scan = table.scan().build().await?;
let batches = scan.to_arrow().await?;
Ok(batches)
}
async fn read_snapshot(
table: &iceberg::Table,
snapshot_id: i64,
) -> Result<Vec<RecordBatch>> {
// Time travel to specific snapshot
let scan = table
.scan()
.snapshot_id(snapshot_id)
.build()
.await?;
let batches = scan.to_arrow().await?;
Ok(batches)
}
async fn read_as_of_timestamp(
table: &iceberg::Table,
timestamp_ms: i64,
) -> Result<Vec<RecordBatch>> {
// Time travel to specific timestamp
let scan = table
.scan()
.as_of_timestamp(timestamp_ms)
.build()
.await?;
let batches = scan.to_arrow().await?;
Ok(batches)
}
```
7. **Perform schema evolution**:
```rust
async fn evolve_schema(table: &mut iceberg::Table) -> Result<()> {
// Add new column
let mut update = table.update_schema();
update
.add_column("new_field", Type::Primitive(PrimitiveType::String), true)?
.commit()
.await?;
println!("Added column: new_field");
// Rename column
let mut update = table.update_schema();
update
.rename_column("old_name", "new_name")?
.commit()
.await?;
println!("Renamed column: old_name -> new_name");
// Delete column (metadata only)
let mut update = table.update_schema();
update
.delete_column("unused_field")?
.commit()
.await?;
println!("Deleted column: unused_field");
// Update column type (limited support)
let mut update = table.update_schema();
update
.update_column("numeric_field", Type::Primitive(PrimitiveType::Double))?
.commit()
.await?;
// Reorder columns
let mut update = table.update_schema();
update
.move_first("important_field")?
.move_after("field_a", "field_b")?
.commit()
.await?;
Ok(())
}
```
8. **Query history and snapshots**:
```rust
async fn inspect_history(table: &iceberg::Table) -> Result<()> {
let metadata = table.metadata();
// List all snapshots
println!("Snapshots:");
for snapshot in metadata.snapshots() {
println!(
" ID: {}, Timestamp: {}, Summary: {:?}",
snapshot.snapshot_id(),
snapshot.timestamp_ms(),
snapshot.summary()
);
}
// Get current snapshot
if let Some(current) = metadata.current_snapshot() {
println!("Current snapshot: {}", current.snapshot_id());
println!("Manifest list: {}", current.manifest_list());
}
// Get schema history
println!("\nSchema versions:");
for schema in metadata.schemas() {
println!(" Schema ID {}: {} fields", schema.schema_id(), schema.fields().len());
}
Ok(())
}
```
## Advanced Features
**Partition evolution**:
```rust
async fn evolve_partitioning(table: &mut iceberg::Table) -> Result<()> {
// Change partition strategy without rewriting data
let mut update = table.update_partition_spec();
// Add day partitioning
update.add_field(
"timestamp",
"day",
iceberg::spec::Transform::Day,
)?;
// Remove old month partitioning
update.remove_field("month")?;
update.commit().await?;
println!("Partition spec evolved");
Ok(())
}
```
**Hidden partitioning**:
```rust
// Iceberg supports hidden partitioning - partition on derived values
// Users don't need to specify partition columns in queries
async fn create_table_with_hidden_partitioning(catalog: &impl Catalog) -> Result<()> {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "timestamp", Type::Primitive(PrimitiveType::Timestamp)),
NestedField::required(2, "data", Type::Primitive(PrimitiveType::String)),
])
.build()?;
// Partition by year(timestamp) and month(timestamp)
// But timestamp is a regular column, not a partition column
let partition_spec = iceberg::spec::PartitionSpec::builder()
.add_partition_field(1, "year", iceberg::spec::Transform::Year)?
.add_partition_field(1, "month", iceberg::spec::Transform::Month)?
.build()?;
// Now queries like:
// SELECT * FROM table WHERE timestamp >= '2024-01-01'
// Will automatically use partition pruning
Ok(())
}
```
**Incremental reads**:
```rust
async fn incremental_read(
table: &iceberg::Table,
from_snapshot_id: i64,
to_snapshot_id: Option<i64>,
) -> Result<Vec<RecordBatch>> {
// Read only data added between snapshots
let scan = table
.scan()
.from_snapshot_id(from_snapshot_id)
.snapshot_id(to_snapshot_id.unwrap_or_else(|| {
table.metadata().current_snapshot().unwrap().snapshot_id()
}))
.build()
.await?;
let batches = scan.to_arrow().await?;
Ok(batches)
}
```
**Filtering and projection**:
```rust
use iceberg::expr::{Predicate, Reference};
async fn filtered_scan(table: &iceberg::Table) -> Result<Vec<RecordBatch>> {
// Build predicate
let predicate = Predicate::and(
Predicate::greater_than("timestamp", 1704067200000i64), // > 2024-01-01
Predicate::equal("event_type", "click"),
);
// Scan with predicate pushdown
let scan = table
.scan()
.with_filter(predicate)
.select(&["user_id", "timestamp", "event_type"]) // Column projection
.build()
.await?;
let batches = scan.to_arrow().await?;
Ok(batches)
}
```
**Compaction (optimize files)**:
```rust
async fn compact_table(table: &iceberg::Table) -> Result<()> {
// Read small files
let scan = table.scan().build().await?;
let batches = scan.to_arrow().await?;
// Rewrite as larger, optimized files
let mut writer = table
.writer()
.partition_by(table.metadata().default_partition_spec()?)
.build()
.await?;
for batch in batches {
writer.write(&batch).await?;
}
let new_files = writer.close().await?;
// Atomic replace
let mut rewrite = table.new_rewrite();
rewrite
.delete_files(/* old files */)
.add_files(new_files)
.commit()
.await?;
Ok(())
}
```
## Integration with DataFusion
```rust
use datafusion::prelude::*;
use iceberg::datafusion::IcebergTableProvider;
async fn query_with_datafusion(table: iceberg::Table) -> Result<()> {
// Create DataFusion context
let ctx = SessionContext::new();
// Register Iceberg table
let provider = IcebergTableProvider::try_new(table).await?;
ctx.register_table("events", Arc::new(provider))?;
// Query with SQL
let df = ctx.sql("
SELECT
event_type,
COUNT(*) as count
FROM events
WHERE timestamp >= '2024-01-01'
GROUP BY event_type
").await?;
df.show().await?;
Ok(())
}
```
## Common Patterns
**Creating a data pipeline**:
```rust
async fn data_pipeline(
source_store: Arc<dyn ObjectStore>,
table: &iceberg::Table,
) -> Result<()> {
// 1. Read from source (e.g., Parquet)
let batches = read_parquet_files(source_store).await?;
// 2. Transform data
let transformed = transform_batches(batches)?;
// 3. Write to Iceberg table
write_data(table, transformed).await?;
println!("Pipeline complete");
Ok(())
}
```
**Implementing time-based retention**:
```rust
async fn expire_old_snapshots(table: &mut iceberg::Table, days: i64) -> Result<()> {
let cutoff_ms = chrono::Utc::now().timestamp_millis() - (days * 24 * 60 * 60 * 1000);
let mut expire = table.expire_snapshots();
expire
.expire_older_than(cutoff_ms)
.retain_last(10) // Keep at least 10 snapshots
.commit()
.await?;
println!("Expired snapshots older than {} days", days);
Ok(())
}
```
**Atomic updates**:
```rust
async fn atomic_update(table: &iceberg::Table) -> Result<()> {
// All or nothing - either entire commit succeeds or fails
let mut transaction = table.new_transaction();
// Multiple operations in one transaction
transaction.append(/* new data */);
transaction.update_schema(/* schema change */);
transaction.update_properties(/* property change */);
// Atomic commit
transaction.commit().await?;
Ok(())
}
```
## Best Practices
- **Use hidden partitioning** for cleaner queries and easier partition evolution
- **Define sort order** to cluster related data together
- **Expire old snapshots** regularly to avoid metadata bloat
- **Use schema evolution** instead of creating new tables
- **Leverage time travel** for debugging and auditing
- **Compact small files** periodically for better read performance
- **Use partition evolution** to adapt to changing data patterns
- **Enable statistics** for query optimization
## Benefits Over Raw Parquet
1. **ACID Transactions**: Atomic commits prevent partial updates
2. **Time Travel**: Query historical table states
3. **Schema Evolution**: Add/rename/reorder columns safely
4. **Partition Evolution**: Change partitioning without rewriting
5. **Hidden Partitioning**: Cleaner queries, automatic partition pruning
6. **Concurrency**: Multiple writers with optimistic concurrency
7. **Metadata Management**: Efficient metadata operations
8. **Data Lineage**: Track changes over time
## Troubleshooting
**Metadata file not found**:
- Verify catalog configuration
- Check object store permissions
- Ensure table was created successfully
**Schema mismatch on write**:
- Verify writer schema matches table schema
- Use schema evolution to add new fields
- Check for required vs. optional fields
**Slow queries**:
- Use predicate pushdown with filters
- Enable column projection
- Compact small files
- Verify partition pruning is working
**Snapshot expiration issues**:
- Ensure retain_last is set appropriately
- Don't expire too aggressively if time travel is needed
- Clean up orphaned files separately
## Resources
- [Apache Iceberg Specification](https://iceberg.apache.org/spec/)
- [iceberg-rust Documentation](https://docs.rs/iceberg/)
- [Iceberg Table Format](https://iceberg.apache.org/docs/latest/)

View File

@@ -0,0 +1,147 @@
---
description: Configure object_store for cloud storage (S3, Azure, GCS, or local filesystem)
---
# Object Store Setup
Help the user configure the `object_store` crate for their cloud provider or local filesystem.
## Steps
1. **Identify the storage backend** by asking the user which provider they want to use:
- Amazon S3
- Azure Blob Storage
- Google Cloud Storage
- Local filesystem (for development/testing)
2. **Add the dependency** to their Cargo.toml:
```toml
[dependencies]
object_store = { version = "0.9", features = ["aws", "azure", "gcp"] }
tokio = { version = "1", features = ["full"] }
```
3. **Create the appropriate builder** based on their choice:
**For Amazon S3**:
```rust
use object_store::aws::AmazonS3Builder;
use object_store::ObjectStore;
use std::sync::Arc;
let s3 = AmazonS3Builder::new()
.with_region("us-east-1")
.with_bucket_name("my-data-lake")
.with_access_key_id(access_key)
.with_secret_access_key(secret_key)
// Production settings
.with_retry(RetryConfig {
max_retries: 3,
retry_timeout: Duration::from_secs(10),
..Default::default()
})
.build()?;
let store: Arc<dyn ObjectStore> = Arc::new(s3);
```
**For Azure Blob Storage**:
```rust
use object_store::azure::MicrosoftAzureBuilder;
let azure = MicrosoftAzureBuilder::new()
.with_account("mystorageaccount")
.with_container_name("mycontainer")
.with_access_key(access_key)
.build()?;
let store: Arc<dyn ObjectStore> = Arc::new(azure);
```
**For Google Cloud Storage**:
```rust
use object_store::gcs::GoogleCloudStorageBuilder;
let gcs = GoogleCloudStorageBuilder::new()
.with_service_account_key(service_account_json)
.with_bucket_name("my-bucket")
.build()?;
let store: Arc<dyn ObjectStore> = Arc::new(gcs);
```
**For Local Filesystem**:
```rust
use object_store::local::LocalFileSystem;
let local = LocalFileSystem::new_with_prefix("/tmp/data-lake")?;
let store: Arc<dyn ObjectStore> = Arc::new(local);
```
4. **Test the connection** by listing objects or performing a simple operation:
```rust
// List objects with a prefix
let prefix = Some(&Path::from("data/"));
let mut list = store.list(prefix);
while let Some(meta) = list.next().await {
let meta = meta?;
println!("{}: {} bytes", meta.location, meta.size);
}
```
5. **Add error handling** and configuration management:
```rust
use object_store::Error as ObjectStoreError;
async fn create_store() -> Result<Arc<dyn ObjectStore>, ObjectStoreError> {
// Get credentials from environment or config
let region = std::env::var("AWS_REGION")
.unwrap_or_else(|_| "us-east-1".to_string());
let bucket = std::env::var("S3_BUCKET")?;
let s3 = AmazonS3Builder::from_env()
.with_region(&region)
.with_bucket_name(&bucket)
.build()?;
Ok(Arc::new(s3))
}
```
## Best Practices
- **Use Arc<dyn ObjectStore>** for shared ownership across threads
- **Configure retry logic** for production resilience
- **Store credentials securely** using environment variables or secret managers
- **Use LocalFileSystem** for testing to avoid cloud costs
- **Enable request timeouts** to prevent hanging operations
- **Set up connection pooling** for better performance
## Common Patterns
**Environment-based configuration**:
```rust
let s3 = AmazonS3Builder::from_env()
.with_bucket_name(&bucket)
.build()?;
```
**Multipart upload for large files**:
```rust
let multipart = store.put_multipart(&path).await?;
for chunk in chunks {
multipart.put_part(chunk).await?;
}
multipart.complete().await?;
```
**Streaming downloads**:
```rust
let result = store.get(&path).await?;
let mut stream = result.into_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
// Process chunk
}
```

View File

@@ -0,0 +1,359 @@
---
description: Read Parquet files efficiently with predicate pushdown and column projection
---
# Read Parquet Files
Help the user read Parquet files from object storage with optimal performance using predicate pushdown, column projection, and row group filtering.
## Steps
1. **Add required dependencies**:
```toml
[dependencies]
parquet = "52"
arrow = "52"
object_store = "0.9"
tokio = { version = "1", features = ["full"] }
futures = "0.3"
```
2. **Create a basic Parquet reader** from object_store:
```rust
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use object_store::{ObjectStore, path::Path};
use arrow::record_batch::RecordBatch;
use futures::stream::StreamExt;
async fn read_parquet(
store: Arc<dyn ObjectStore>,
path: &str,
) -> Result<Vec<RecordBatch>> {
let path = Path::from(path);
// Get file metadata
let meta = store.head(&path).await?;
// Create reader
let reader = ParquetObjectReader::new(store, meta);
// Build stream
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let mut stream = builder.build()?;
// Collect batches
let mut batches = Vec::new();
while let Some(batch) = stream.next().await {
batches.push(batch?);
}
Ok(batches)
}
```
3. **Add column projection** to read only needed columns:
```rust
use parquet::arrow::ProjectionMask;
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
// Get schema to determine column indices
let schema = builder.schema();
println!("Available columns: {:?}", schema.fields());
// Project specific columns by index
let projection = ProjectionMask::roots(schema, vec![0, 2, 5]);
let builder = builder.with_projection(projection);
// Or project by column name (helper function)
fn project_columns(builder: ParquetRecordBatchStreamBuilder<ParquetObjectReader>,
column_names: &[&str]) -> ParquetRecordBatchStreamBuilder<ParquetObjectReader> {
let schema = builder.schema();
let indices: Vec<usize> = column_names
.iter()
.filter_map(|name| schema.column_with_name(name).map(|(idx, _)| idx))
.collect();
let projection = ProjectionMask::roots(schema, indices);
builder.with_projection(projection)
}
let builder = project_columns(builder, &["user_id", "timestamp", "event_type"]);
```
4. **Add row group filtering** using statistics:
```rust
use parquet::file::metadata::ParquetMetaData;
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let metadata = builder.metadata();
// Filter row groups based on statistics
let row_groups_to_read: Vec<usize> = metadata
.row_groups()
.iter()
.enumerate()
.filter_map(|(idx, rg)| {
// Example: filter by min/max values
let col_metadata = rg.column(0); // First column
if let Some(stats) = col_metadata.statistics() {
// Check if row group might contain relevant data
// This is pseudo-code; actual implementation depends on data type
if stats_match_predicate(stats) {
return Some(idx);
}
}
None
})
.collect();
let builder = builder.with_row_groups(row_groups_to_read);
```
5. **Implement streaming processing** for large files:
```rust
async fn process_large_parquet(
store: Arc<dyn ObjectStore>,
path: &str,
) -> Result<()> {
let path = Path::from(path);
let meta = store.head(&path).await?;
let reader = ParquetObjectReader::new(store, meta);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
// Limit batch size to control memory usage
let builder = builder.with_batch_size(8192);
let mut stream = builder.build()?;
// Process batches incrementally
while let Some(batch) = stream.next().await {
let batch = batch?;
// Process this batch
println!("Processing batch with {} rows", batch.num_rows());
process_batch(&batch)?;
// Batch is dropped here, freeing memory
}
Ok(())
}
fn process_batch(batch: &RecordBatch) -> Result<()> {
// Your processing logic
Ok(())
}
```
6. **Add comprehensive error handling**:
```rust
use thiserror::Error;
#[derive(Error, Debug)]
enum ParquetReadError {
#[error("Object store error: {0}")]
ObjectStore(#[from] object_store::Error),
#[error("Parquet error: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
#[error("Arrow error: {0}")]
Arrow(#[from] arrow::error::ArrowError),
#[error("File not found: {0}")]
FileNotFound(String),
}
async fn read_with_error_handling(
store: Arc<dyn ObjectStore>,
path: &str,
) -> Result<Vec<RecordBatch>, ParquetReadError> {
let path = Path::from(path);
// Check if file exists
if !store.head(&path).await.is_ok() {
return Err(ParquetReadError::FileNotFound(path.to_string()));
}
let meta = store.head(&path).await?;
let reader = ParquetObjectReader::new(store, meta);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let mut stream = builder.build()?;
let mut batches = Vec::new();
while let Some(batch) = stream.next().await {
batches.push(batch?);
}
Ok(batches)
}
```
## Performance Optimization
**Reading with all optimizations**:
```rust
async fn optimized_read(
store: Arc<dyn ObjectStore>,
path: &str,
columns: &[&str],
) -> Result<Vec<RecordBatch>> {
let path = Path::from(path);
let meta = store.head(&path).await?;
let reader = ParquetObjectReader::new(store, meta);
let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
// 1. Column projection
let schema = builder.schema();
let indices: Vec<usize> = columns
.iter()
.filter_map(|name| schema.column_with_name(name).map(|(idx, _)| idx))
.collect();
let projection = ProjectionMask::roots(schema, indices);
builder = builder.with_projection(projection);
// 2. Batch size tuning
builder = builder.with_batch_size(8192);
// 3. Row group filtering (if applicable)
// builder = builder.with_row_groups(filtered_row_groups);
let mut stream = builder.build()?;
let mut batches = Vec::new();
while let Some(batch) = stream.next().await {
batches.push(batch?);
}
Ok(batches)
}
```
## Reading Metadata Only
```rust
async fn read_metadata(
store: Arc<dyn ObjectStore>,
path: &str,
) -> Result<()> {
let path = Path::from(path);
let meta = store.head(&path).await?;
let reader = ParquetObjectReader::new(store, meta);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let metadata = builder.metadata();
println!("Schema: {:?}", builder.schema());
println!("Number of row groups: {}", metadata.num_row_groups());
println!("Total rows: {}", metadata.file_metadata().num_rows());
for (idx, rg) in metadata.row_groups().iter().enumerate() {
println!("Row Group {}: {} rows", idx, rg.num_rows());
for (col_idx, col) in rg.columns().iter().enumerate() {
if let Some(stats) = col.statistics() {
println!(" Column {}: min={:?}, max={:?}, null_count={:?}",
col_idx,
stats.min_bytes(),
stats.max_bytes(),
stats.null_count()
);
}
}
}
Ok(())
}
```
## Common Patterns
**Reading multiple files in parallel**:
```rust
use futures::stream::{self, StreamExt};
async fn read_multiple_files(
store: Arc<dyn ObjectStore>,
paths: Vec<String>,
) -> Result<Vec<RecordBatch>> {
let results = stream::iter(paths)
.map(|path| {
let store = store.clone();
async move {
read_parquet(store, &path).await
}
})
.buffer_unordered(10) // Process 10 files concurrently
.collect::<Vec<_>>()
.await;
// Flatten results
let mut all_batches = Vec::new();
for result in results {
all_batches.extend(result?);
}
Ok(all_batches)
}
```
**Reading partitioned data**:
```rust
async fn read_partition(
store: Arc<dyn ObjectStore>,
base_path: &str,
year: i32,
month: u32,
) -> Result<Vec<RecordBatch>> {
let partition_path = format!("{}/year={}/month={:02}/", base_path, year, month);
// List all files in partition
let prefix = Some(&Path::from(partition_path));
let files: Vec<_> = store.list(prefix)
.filter_map(|meta| async move {
meta.ok().and_then(|m| {
if m.location.as_ref().ends_with(".parquet") {
Some(m.location.to_string())
} else {
None
}
})
})
.collect()
.await;
// Read all files
read_multiple_files(store, files).await
}
```
## Best Practices
- **Use column projection** to read only needed columns (10x+ speedup for wide tables)
- **Stream large files** instead of collecting all batches into memory
- **Check metadata first** to understand file structure before reading
- **Use batch_size** to control memory usage (8192-65536 rows per batch)
- **Filter row groups** using statistics when possible
- **Read multiple files in parallel** for partitioned datasets
- **Handle schema evolution** by checking schema before processing
## Troubleshooting
**Out of memory errors**:
- Reduce batch size: `.with_batch_size(4096)`
- Stream instead of collecting: process batches one at a time
- Use column projection to read fewer columns
**Slow reads**:
- Enable column projection if reading wide tables
- Check if row group filtering is possible
- Increase parallelism when reading multiple files
- Verify network connectivity to object store
**Schema mismatch**:
- Read metadata first to inspect actual schema
- Handle optional columns that may not exist in older files
- Use schema evolution strategies from DataFusion

View File

@@ -0,0 +1,495 @@
---
description: Write Parquet files with optimal compression, encoding, and row group sizing
---
# Write Parquet Files
Help the user write Parquet files to object storage with production-quality settings for compression, encoding, row group sizing, and statistics.
## Steps
1. **Add required dependencies**:
```toml
[dependencies]
parquet = "52"
arrow = "52"
object_store = "0.9"
tokio = { version = "1", features = ["full"] }
```
2. **Create a basic Parquet writer**:
```rust
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, ZstdLevel};
use parquet::file::properties::WriterProperties;
use object_store::{ObjectStore, path::Path};
use arrow::record_batch::RecordBatch;
async fn write_parquet(
store: Arc<dyn ObjectStore>,
path: &str,
batches: Vec<RecordBatch>,
schema: SchemaRef,
) -> Result<()> {
let path = Path::from(path);
// Create buffered writer for object store
let object_store_writer = object_store::buffered::BufWriter::new(
store.clone(),
path.clone()
);
// Create Arrow writer
let mut writer = AsyncArrowWriter::try_new(
object_store_writer,
schema,
None, // Use default properties
)?;
// Write batches
for batch in batches {
writer.write(&batch).await?;
}
// Close writer (flushes and finalizes file)
writer.close().await?;
Ok(())
}
```
3. **Configure writer properties** for production use:
```rust
use parquet::file::properties::{WriterProperties, WriterVersion};
use parquet::basic::{Compression, Encoding, ZstdLevel};
fn create_writer_properties() -> WriterProperties {
WriterProperties::builder()
// Use Parquet 2.0 format
.set_writer_version(WriterVersion::PARQUET_2_0)
// Compression: ZSTD level 3 (balanced)
.set_compression(Compression::ZSTD(
ZstdLevel::try_new(3).unwrap()
))
// Row group size: ~500MB uncompressed or 100M rows
.set_max_row_group_size(100_000_000)
// Data page size: 1MB
.set_data_page_size_limit(1024 * 1024)
// Enable dictionary encoding
.set_dictionary_enabled(true)
// Write batch size
.set_write_batch_size(1024)
// Enable statistics for predicate pushdown
.set_statistics_enabled(parquet::file::properties::EnabledStatistics::Page)
// Metadata
.set_created_by("my-app v1.0".to_string())
.build()
}
async fn write_with_properties(
store: Arc<dyn ObjectStore>,
path: &str,
batches: Vec<RecordBatch>,
schema: SchemaRef,
) -> Result<()> {
let path = Path::from(path);
let writer_obj = object_store::buffered::BufWriter::new(store, path);
let props = create_writer_properties();
let mut writer = AsyncArrowWriter::try_new(
writer_obj,
schema,
Some(props),
)?;
for batch in batches {
writer.write(&batch).await?;
}
writer.close().await?;
Ok(())
}
```
4. **Set column-specific properties** for optimal encoding:
```rust
use parquet::schema::types::ColumnPath;
fn create_column_specific_properties() -> WriterProperties {
WriterProperties::builder()
// High-entropy data: use stronger compression
.set_column_compression(
ColumnPath::from("raw_data"),
Compression::ZSTD(ZstdLevel::try_new(6).unwrap()),
)
// Low-cardinality columns: use dictionary encoding
.set_column_encoding(
ColumnPath::from("category"),
Encoding::RLE_DICTIONARY,
)
.set_column_compression(
ColumnPath::from("category"),
Compression::SNAPPY,
)
// Timestamp columns: use delta encoding
.set_column_encoding(
ColumnPath::from("timestamp"),
Encoding::DELTA_BINARY_PACKED,
)
// High-frequency data: faster compression
.set_column_compression(
ColumnPath::from("metric"),
Compression::SNAPPY,
)
.build()
}
```
5. **Implement streaming writes** for large datasets:
```rust
use futures::stream::StreamExt;
async fn write_stream(
store: Arc<dyn ObjectStore>,
path: &str,
mut batch_stream: impl Stream<Item = Result<RecordBatch>> + Unpin,
schema: SchemaRef,
) -> Result<()> {
let path = Path::from(path);
let writer_obj = object_store::buffered::BufWriter::new(store, path);
let props = create_writer_properties();
let mut writer = AsyncArrowWriter::try_new(writer_obj, schema, Some(props))?;
// Write batches as they arrive
while let Some(batch) = batch_stream.next().await {
let batch = batch?;
writer.write(&batch).await?;
}
writer.close().await?;
Ok(())
}
```
6. **Implement partitioned writes**:
```rust
use chrono::NaiveDate;
async fn write_partitioned(
store: Arc<dyn ObjectStore>,
base_path: &str,
date: NaiveDate,
partition_id: usize,
batch: RecordBatch,
schema: SchemaRef,
) -> Result<()> {
// Create partitioned path: base/year=2024/month=01/day=15/part-00000.parquet
let path = format!(
"{}/year={}/month={:02}/day={:02}/part-{:05}.parquet",
base_path,
date.year(),
date.month(),
date.day(),
partition_id
);
write_parquet(store, &path, vec![batch], schema).await
}
// Write multiple partitions
async fn write_all_partitions(
store: Arc<dyn ObjectStore>,
base_path: &str,
partitioned_data: HashMap<NaiveDate, Vec<RecordBatch>>,
schema: SchemaRef,
) -> Result<()> {
for (date, batches) in partitioned_data {
for (partition_id, batch) in batches.into_iter().enumerate() {
write_partitioned(
store.clone(),
base_path,
date,
partition_id,
batch,
schema.clone(),
).await?;
}
}
Ok(())
}
```
7. **Add proper error handling and validation**:
```rust
use thiserror::Error;
#[derive(Error, Debug)]
enum ParquetWriteError {
#[error("Object store error: {0}")]
ObjectStore(#[from] object_store::Error),
#[error("Parquet error: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
#[error("Arrow error: {0}")]
Arrow(#[from] arrow::error::ArrowError),
#[error("Empty batch: cannot write empty data")]
EmptyBatch,
#[error("Schema mismatch: {0}")]
SchemaMismatch(String),
}
async fn write_with_validation(
store: Arc<dyn ObjectStore>,
path: &str,
batches: Vec<RecordBatch>,
schema: SchemaRef,
) -> Result<(), ParquetWriteError> {
// Validate input
if batches.is_empty() {
return Err(ParquetWriteError::EmptyBatch);
}
// Verify schema consistency
for batch in &batches {
if batch.schema() != schema {
return Err(ParquetWriteError::SchemaMismatch(
format!("Batch schema does not match expected schema")
));
}
}
let path = Path::from(path);
let writer_obj = object_store::buffered::BufWriter::new(store, path);
let props = create_writer_properties();
let mut writer = AsyncArrowWriter::try_new(writer_obj, schema, Some(props))?;
for batch in batches {
writer.write(&batch).await?;
}
writer.close().await?;
Ok(())
}
```
## Performance Tuning
**Optimal row group sizing**:
```rust
// Calculate appropriate row group size based on data
fn calculate_row_group_size(schema: &Schema, target_bytes: usize) -> usize {
// Estimate bytes per row
let bytes_per_row: usize = schema
.fields()
.iter()
.map(|field| estimate_field_size(field.data_type()))
.sum();
// Target ~500MB per row group
target_bytes / bytes_per_row.max(1)
}
fn estimate_field_size(data_type: &DataType) -> usize {
match data_type {
DataType::Int32 => 4,
DataType::Int64 => 8,
DataType::Float64 => 8,
DataType::Utf8 => 50, // Estimate average string length
DataType::Timestamp(_, _) => 8,
DataType::Boolean => 1,
_ => 100, // Conservative estimate for complex types
}
}
let row_group_size = calculate_row_group_size(&schema, 500 * 1024 * 1024);
let props = WriterProperties::builder()
.set_max_row_group_size(row_group_size)
.build();
```
**Compression codec selection**:
```rust
fn choose_compression(use_case: CompressionUseCase) -> Compression {
match use_case {
CompressionUseCase::Balanced => Compression::ZSTD(ZstdLevel::try_new(3).unwrap()),
CompressionUseCase::MaxCompression => Compression::ZSTD(ZstdLevel::try_new(9).unwrap()),
CompressionUseCase::FastWrite => Compression::SNAPPY,
CompressionUseCase::FastRead => Compression::SNAPPY,
CompressionUseCase::Archive => Compression::ZSTD(ZstdLevel::try_new(19).unwrap()),
}
}
enum CompressionUseCase {
Balanced,
MaxCompression,
FastWrite,
FastRead,
Archive,
}
```
## Common Patterns
**Batching small records**:
```rust
use arrow::array::{RecordBatchOptions, ArrayRef};
async fn batch_and_write<T>(
store: Arc<dyn ObjectStore>,
path: &str,
records: Vec<T>,
schema: SchemaRef,
batch_size: usize,
) -> Result<()>
where
T: IntoRecordBatch,
{
let path = Path::from(path);
let writer_obj = object_store::buffered::BufWriter::new(store, path);
let props = create_writer_properties();
let mut writer = AsyncArrowWriter::try_new(writer_obj, schema.clone(), Some(props))?;
// Process in batches
for chunk in records.chunks(batch_size) {
let batch = records_to_batch(chunk, schema.clone())?;
writer.write(&batch).await?;
}
writer.close().await?;
Ok(())
}
```
**Append to existing files (via temp + rename)**:
```rust
// Parquet doesn't support appending, so read + rewrite
async fn append_to_parquet(
store: Arc<dyn ObjectStore>,
path: &str,
new_batches: Vec<RecordBatch>,
) -> Result<()> {
// 1. Read existing data
let existing_batches = read_parquet(store.clone(), path).await?;
// 2. Combine with new data
let mut all_batches = existing_batches;
all_batches.extend(new_batches);
// 3. Write to temp location
let temp_path = format!("{}.tmp", path);
write_parquet(
store.clone(),
&temp_path,
all_batches,
schema,
).await?;
// 4. Atomic rename
let from = Path::from(temp_path);
let to = Path::from(path);
store.rename(&from, &to).await?;
Ok(())
}
```
**Writing with progress tracking**:
```rust
use indicatif::{ProgressBar, ProgressStyle};
async fn write_with_progress(
store: Arc<dyn ObjectStore>,
path: &str,
batches: Vec<RecordBatch>,
schema: SchemaRef,
) -> Result<()> {
let pb = ProgressBar::new(batches.len() as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} {msg}")
.unwrap()
);
let path = Path::from(path);
let writer_obj = object_store::buffered::BufWriter::new(store, path);
let props = create_writer_properties();
let mut writer = AsyncArrowWriter::try_new(writer_obj, schema, Some(props))?;
for (idx, batch) in batches.iter().enumerate() {
writer.write(batch).await?;
pb.set_position(idx as u64 + 1);
pb.set_message(format!("{} rows written", batch.num_rows()));
}
writer.close().await?;
pb.finish_with_message("Complete");
Ok(())
}
```
## Best Practices
- **Use ZSTD(3) compression** for balanced performance (recommended for production)
- **Set row group size to 100MB-1GB** uncompressed for optimal S3 scanning
- **Enable statistics** for predicate pushdown optimization
- **Use dictionary encoding** for low-cardinality columns (categories, enums)
- **Write to temp location + rename** for atomic writes
- **Partition large datasets** by date or other logical grouping
- **Set column-specific properties** for heterogeneous data
- **Validate schema consistency** across all batches before writing
## Troubleshooting
**Slow writes**:
- Reduce compression level (use SNAPPY or ZSTD(1))
- Increase row group size to reduce overhead
- Use buffered writer (already included in examples)
- Write multiple files in parallel
**Large file sizes**:
- Increase compression level (ZSTD(6-9))
- Enable dictionary encoding for appropriate columns
- Check for redundant data that could be normalized
**Memory issues**:
- Reduce batch size
- Write smaller row groups
- Stream data instead of collecting all batches first
**Compatibility issues**:
- Use WriterVersion::PARQUET_2_0 for best compatibility
- Avoid advanced features if targeting older readers
- Test with target systems (Spark, Hive, etc.)
## Compression Comparison
| Codec | Write Speed | Read Speed | Ratio | Best For |
|-------|-------------|------------|-------|----------|
| Uncompressed | Fastest | Fastest | 1x | Development only |
| SNAPPY | Very Fast | Very Fast | 2-3x | Hot data, real-time |
| ZSTD(1) | Fast | Fast | 2.5-3x | High write throughput |
| ZSTD(3) | Fast | Fast | 3-4x | **Production default** |
| ZSTD(6) | Medium | Fast | 4-5x | Cold storage |
| ZSTD(9) | Slow | Fast | 5-6x | Archive, long-term |