13 KiB
name, description, allowed-tools, version
| name | description | allowed-tools | version |
|---|---|---|---|
| object-store-best-practices | Ensures proper cloud storage operations with retry logic, error handling, streaming, and efficient I/O patterns. Activates when users work with object_store for S3, Azure, or GCS operations. | Read, Grep | 1.0.0 |
Object Store Best Practices Skill
You are an expert at implementing robust cloud storage operations using the object_store crate. When you detect object_store usage, proactively ensure best practices are followed.
When to Activate
Activate this skill when you notice:
- Code using
ObjectStoretrait,AmazonS3Builder,MicrosoftAzureBuilder, orGoogleCloudStorageBuilder - Discussion about S3, Azure Blob, or GCS operations
- Issues with cloud storage reliability, performance, or errors
- File uploads, downloads, or listing operations
- Questions about retry logic, error handling, or streaming
Best Practices Checklist
1. Retry Configuration
What to Look For:
- Missing retry logic for production code
- Default settings without explicit retry configuration
Good Pattern:
use object_store::aws::AmazonS3Builder;
use object_store::RetryConfig;
let s3 = AmazonS3Builder::new()
.with_region("us-east-1")
.with_bucket_name("my-bucket")
.with_retry(RetryConfig {
max_retries: 3,
retry_timeout: Duration::from_secs(10),
..Default::default()
})
.build()?;
Bad Pattern:
// No retry configuration - fails on transient errors
let s3 = AmazonS3Builder::new()
.with_region("us-east-1")
.with_bucket_name("my-bucket")
.build()?;
Suggestion:
Cloud storage operations need retry logic for production resilience.
Add retry configuration to handle transient failures:
.with_retry(RetryConfig {
max_retries: 3,
retry_timeout: Duration::from_secs(10),
..Default::default()
})
This handles 503 SlowDown, network timeouts, and temporary outages.
2. Error Handling
What to Look For:
- Using
unwrap()orexpect()on storage operations - Not handling specific error types
- Missing context in error propagation
Good Pattern:
use object_store::Error as ObjectStoreError;
use thiserror::Error;
#[derive(Error, Debug)]
enum StorageError {
#[error("Object store error: {0}")]
ObjectStore(#[from] ObjectStoreError),
#[error("File not found: {path}")]
NotFound { path: String },
#[error("Access denied: {path}")]
PermissionDenied { path: String },
}
async fn read_file(store: &dyn ObjectStore, path: &Path) -> Result<Bytes, StorageError> {
match store.get(path).await {
Ok(result) => Ok(result.bytes().await?),
Err(ObjectStoreError::NotFound { path, .. }) => {
Err(StorageError::NotFound { path: path.to_string() })
}
Err(e) => Err(e.into()),
}
}
Bad Pattern:
let data = store.get(&path).await.unwrap(); // Crashes on errors!
Suggestion:
Avoid unwrap() on storage operations. Use proper error handling:
match store.get(&path).await {
Ok(result) => { /* handle success */ }
Err(ObjectStoreError::NotFound { .. }) => { /* handle missing file */ }
Err(e) => { /* handle other errors */ }
}
Or use thiserror for better error types.
3. Streaming Large Objects
What to Look For:
- Loading entire files into memory with
.bytes().await - Not using streaming for large files (>100MB)
Good Pattern (Streaming):
use futures::stream::StreamExt;
let result = store.get(&path).await?;
let mut stream = result.into_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
// Process chunk incrementally
process_chunk(chunk)?;
}
Bad Pattern (Loading to Memory):
let result = store.get(&path).await?;
let bytes = result.bytes().await?; // Loads entire file!
Suggestion:
For files >100MB, use streaming to avoid memory issues:
let mut stream = store.get(&path).await?.into_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
process_chunk(chunk)?;
}
This processes data incrementally without loading everything into memory.
4. Multipart Upload for Large Files
What to Look For:
- Using
put()for large files (>100MB) - Missing multipart upload for big data
Good Pattern:
async fn upload_large_file(
store: &dyn ObjectStore,
path: &Path,
data: impl Stream<Item = Bytes>,
) -> Result<()> {
let multipart = store.put_multipart(path).await?;
let mut stream = data;
while let Some(chunk) = stream.next().await {
multipart.put_part(chunk).await?;
}
multipart.complete().await?;
Ok(())
}
Bad Pattern:
// Inefficient for large files
let large_data = vec![0u8; 1_000_000_000]; // 1GB
store.put(path, large_data.into()).await?;
Suggestion:
For files >100MB, use multipart upload for better reliability:
let multipart = store.put_multipart(&path).await?;
for chunk in chunks {
multipart.put_part(chunk).await?;
}
multipart.complete().await?;
Benefits:
- Resume failed uploads
- Better memory efficiency
- Improved reliability
5. Efficient Listing
What to Look For:
- Not using prefixes for listing
- Loading all results without pagination
- Not filtering on client side
Good Pattern:
use futures::stream::StreamExt;
// List with prefix
let prefix = Some(&Path::from("data/2024/"));
let mut list = store.list(prefix);
while let Some(meta) = list.next().await {
let meta = meta?;
if should_process(&meta) {
process_object(&meta).await?;
}
}
Better Pattern with Filtering:
let prefix = Some(&Path::from("data/2024/01/"));
let list = store.list(prefix);
let filtered = list.filter(|result| {
future::ready(match result {
Ok(meta) => meta.location.as_ref().ends_with(".parquet"),
Err(_) => true,
})
});
futures::pin_mut!(filtered);
while let Some(meta) = filtered.next().await {
let meta = meta?;
process_object(&meta).await?;
}
Bad Pattern:
// Lists entire bucket!
let all_objects: Vec<_> = store.list(None).collect().await;
Suggestion:
Use prefixes to limit LIST operations and reduce cost:
let prefix = Some(&Path::from("data/2024/01/"));
let mut list = store.list(prefix);
This is especially important for buckets with millions of objects.
6. Atomic Writes with Rename
What to Look For:
- Writing directly to final location
- Risk of partial writes visible to readers
Good Pattern:
async fn atomic_write(
store: &dyn ObjectStore,
final_path: &Path,
data: Bytes,
) -> Result<()> {
// Write to temp location
let temp_path = Path::from(format!("{}.tmp", final_path));
store.put(&temp_path, data).await?;
// Atomic rename
store.rename(&temp_path, final_path).await?;
Ok(())
}
Bad Pattern:
// Readers might see partial data during write
store.put(&path, data).await?;
Suggestion:
Use temp + rename for atomic writes:
let temp_path = Path::from(format!("{}.tmp", path));
store.put(&temp_path, data).await?;
store.rename(&temp_path, path).await?;
This prevents readers from seeing partial/corrupted data.
7. Connection Pooling
What to Look For:
- Creating new client for each operation
- Not configuring connection limits
Good Pattern:
use object_store::ClientOptions;
let s3 = AmazonS3Builder::new()
.with_client_options(ClientOptions::new()
.with_timeout(Duration::from_secs(30))
.with_connect_timeout(Duration::from_secs(5))
.with_pool_max_idle_per_host(10)
)
.build()?;
// Reuse this store across operations
let store: Arc<dyn ObjectStore> = Arc::new(s3);
Bad Pattern:
// Creating new store for each operation
for file in files {
let s3 = AmazonS3Builder::new().build()?;
upload(s3, file).await?;
}
Suggestion:
Configure connection pooling and reuse the ObjectStore:
let store: Arc<dyn ObjectStore> = Arc::new(s3);
// Clone Arc to share across threads
let store_clone = store.clone();
tokio::spawn(async move {
upload(store_clone, file).await
});
8. Environment-Based Configuration
What to Look For:
- Hardcoded credentials or regions
- Missing environment variable support
Good Pattern:
use std::env;
async fn create_s3_store() -> Result<Arc<dyn ObjectStore>> {
let region = env::var("AWS_REGION")
.unwrap_or_else(|_| "us-east-1".to_string());
let bucket = env::var("S3_BUCKET")?;
let s3 = AmazonS3Builder::from_env() // Reads AWS_* env vars
.with_region(®ion)
.with_bucket_name(&bucket)
.with_retry(RetryConfig::default())
.build()?;
Ok(Arc::new(s3))
}
Bad Pattern:
// Hardcoded credentials
let s3 = AmazonS3Builder::new()
.with_access_key_id("AKIAIOSFODNN7EXAMPLE") // Never do this!
.with_secret_access_key("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
.build()?;
Suggestion:
Use environment-based configuration for security:
let s3 = AmazonS3Builder::from_env() // Reads AWS credentials
.with_bucket_name(&bucket)
.build()?;
Or use IAM roles, instance profiles, or credential chains.
Never hardcode credentials!
Common Issues to Detect
Issue 1: 503 SlowDown Errors
Symptoms: Intermittent 503 errors from S3
Solution:
S3 rate limiting causing 503 SlowDown. Add retry config:
.with_retry(RetryConfig {
max_retries: 5,
retry_timeout: Duration::from_secs(30),
..Default::default()
})
Also consider:
- Using S3 prefixes to distribute load
- Implementing client-side backoff
- Requesting higher limits from AWS
Issue 2: Connection Timeout
Symptoms: Timeout errors on large operations
Solution:
Increase timeouts for large file operations:
.with_client_options(ClientOptions::new()
.with_timeout(Duration::from_secs(300)) // 5 minutes
.with_connect_timeout(Duration::from_secs(10))
)
Issue 3: Memory Leaks on Streaming
Symptoms: Memory grows when processing many files
Solution:
Ensure streams are properly consumed and dropped:
let mut stream = store.get(&path).await?.into_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
process_chunk(chunk)?;
// Chunk is dropped here
}
// Stream is dropped here
Issue 4: Missing Error Context
Symptoms: Hard to debug which operation failed
Solution:
Add context to errors:
store.get(&path).await
.with_context(|| format!("Failed to read {}", path))?;
Or use custom error types with thiserror.
Performance Optimization
Parallel Operations
use futures::stream::{self, StreamExt};
// Upload multiple files in parallel
let uploads = files.iter().map(|file| {
let store = store.clone();
async move {
store.put(&file.path, file.data.clone()).await
}
});
// Process 10 at a time
let results = stream::iter(uploads)
.buffer_unordered(10)
.collect::<Vec<_>>()
.await;
Caching HEAD Requests
use std::collections::HashMap;
// Cache metadata to avoid repeated HEAD requests
let mut metadata_cache: HashMap<Path, ObjectMeta> = HashMap::new();
if let Some(meta) = metadata_cache.get(&path) {
// Use cached metadata
} else {
let meta = store.head(&path).await?;
metadata_cache.insert(path.clone(), meta);
}
Prefetching
// Prefetch next file while processing current
let mut next_file = Some(store.get(&paths[0]));
for (i, path) in paths.iter().enumerate() {
let current = next_file.take().unwrap().await?;
// Start next fetch
if i + 1 < paths.len() {
next_file = Some(store.get(&paths[i + 1]));
}
// Process current
process(current).await?;
}
Testing Best Practices
Use LocalFileSystem for Tests
#[cfg(test)]
mod tests {
use object_store::local::LocalFileSystem;
#[tokio::test]
async fn test_pipeline() {
let store = LocalFileSystem::new_with_prefix(
tempfile::tempdir()?.path()
)?;
// Test with local storage, no cloud costs
run_pipeline(Arc::new(store)).await?;
}
}
Mock for Unit Tests
use mockall::mock;
mock! {
Store {}
#[async_trait]
impl ObjectStore for Store {
async fn get(&self, location: &Path) -> Result<GetResult>;
async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult>;
// ... other methods
}
}
Your Approach
- Detect: Identify object_store operations
- Check: Review against best practices checklist
- Suggest: Provide specific improvements for reliability
- Prioritize: Focus on retry logic, error handling, streaming
- Context: Consider production vs development environment
Communication Style
- Emphasize reliability and production-readiness
- Explain the "why" behind best practices
- Provide code examples for fixes
- Consider cost implications (S3 requests, data transfer)
- Prioritize critical issues (no retry, hardcoded creds, memory leaks)
When you see object_store usage, quickly check for common reliability issues and proactively suggest improvements that prevent production failures.