Files
gh-emillindfors-claude-mark…/commands/lambda-optimize-io.md
2025-11-29 18:25:52 +08:00

14 KiB

description
description
Optimize Rust Lambda function for IO-intensive workloads with async patterns

You are helping the user optimize their Lambda function for IO-intensive workloads.

Your Task

Guide the user to optimize their Lambda for maximum IO performance using async/await patterns.

IO-Intensive Characteristics

Functions that:

  • Make multiple HTTP/API requests
  • Query databases
  • Read/write from S3, DynamoDB
  • Call external services
  • Process message queues
  • Send notifications

Goal: Maximize concurrency to reduce wall-clock time and cost

Key Optimization Strategies

1. Concurrent Operations

Replace sequential operations with concurrent ones:

Sequential (Slow):

async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
    // Each operation waits for previous one - slow!
    let user = fetch_user().await?;
    let posts = fetch_posts().await?;
    let comments = fetch_comments().await?;

    Ok(Response { user, posts, comments })
}

Concurrent (Fast):

use futures::future::try_join_all;

async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
    // All operations run concurrently - fast!
    let (user, posts, comments) = tokio::try_join!(
        fetch_user(),
        fetch_posts(),
        fetch_comments(),
    )?;

    Ok(Response { user, posts, comments })
}

Performance impact: 3 sequential 100ms calls = 300ms. Concurrent = ~100ms.

2. Parallel Collection Processing

Sequential iteration:

let mut results = Vec::new();
for id in user_ids {
    let data = fetch_data(&id).await?;
    results.push(data);
}

Concurrent iteration:

use futures::future::try_join_all;

let futures = user_ids
    .iter()
    .map(|id| fetch_data(id));

let results = try_join_all(futures).await?;

Alternative with buffer (limits concurrency):

use futures::stream::{self, StreamExt};

let results = stream::iter(user_ids)
    .map(|id| fetch_data(&id))
    .buffer_unordered(10)  // Max 10 concurrent requests
    .collect::<Vec<_>>()
    .await;

3. Reuse Connections

Creating new client each time:

async fn fetch_data(url: &str) -> Result<Data, Error> {
    let client = reqwest::Client::new();  // New connection every call!
    client.get(url).send().await?.json().await
}

Shared client with connection pooling:

use std::sync::OnceLock;
use reqwest::Client;

// Initialized once per container
static HTTP_CLIENT: OnceLock<Client> = OnceLock::new();

fn get_client() -> &'static Client {
    HTTP_CLIENT.get_or_init(|| {
        Client::builder()
            .timeout(Duration::from_secs(30))
            .pool_max_idle_per_host(10)
            .build()
            .unwrap()
    })
}

async fn fetch_data(url: &str) -> Result<Data, Error> {
    let client = get_client();  // Reuses connections!
    client.get(url).send().await?.json().await
}

4. Database Connection Pooling

For PostgreSQL with sqlx:

use std::sync::OnceLock;
use sqlx::{PgPool, postgres::PgPoolOptions};

static DB_POOL: OnceLock<PgPool> = OnceLock::new();

async fn get_pool() -> &'static PgPool {
    DB_POOL.get_or_init(|| async {
        PgPoolOptions::new()
            .max_connections(5)  // Limit connections
            .connect(&std::env::var("DATABASE_URL").unwrap())
            .await
            .unwrap()
    }).await
}

async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
    let pool = get_pool().await;

    // Use connection pool for queries
    let user = sqlx::query_as!(User, "SELECT * FROM users WHERE id = $1", user_id)
        .fetch_one(pool)
        .await?;

    Ok(Response { user })
}

For DynamoDB:

use std::sync::OnceLock;
use aws_sdk_dynamodb::Client;

static DYNAMODB_CLIENT: OnceLock<Client> = OnceLock::new();

async fn get_dynamodb() -> &'static Client {
    DYNAMODB_CLIENT.get_or_init(|| async {
        let config = aws_config::load_from_env().await;
        Client::new(&config)
    }).await
}

5. Batch Operations

When possible, use batch APIs:

Individual requests:

for key in keys {
    let item = dynamodb.get_item()
        .table_name("MyTable")
        .key("id", AttributeValue::S(key))
        .send()
        .await?;
}

Batch request:

let batch_keys = keys
    .iter()
    .map(|key| {
        [(
            "id".to_string(),
            AttributeValue::S(key.clone())
        )].into()
    })
    .collect();

let response = dynamodb.batch_get_item()
    .request_items("MyTable", KeysAndAttributes::builder()
        .set_keys(Some(batch_keys))
        .build()?)
    .send()
    .await?;

Complete IO-Optimized Example

use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use serde::{Deserialize, Serialize};
use std::sync::OnceLock;
use reqwest::Client;
use futures::future::try_join_all;
use tracing::info;

#[derive(Deserialize)]
struct Request {
    user_ids: Vec<String>,
}

#[derive(Serialize)]
struct Response {
    users: Vec<UserData>,
}

#[derive(Serialize)]
struct UserData {
    user: User,
    posts: Vec<Post>,
    followers: usize,
}

// Shared HTTP client with connection pooling
static HTTP_CLIENT: OnceLock<Client> = OnceLock::new();

fn get_client() -> &'static Client {
    HTTP_CLIENT.get_or_init(|| {
        Client::builder()
            .timeout(Duration::from_secs(30))
            .pool_max_idle_per_host(10)
            .tcp_keepalive(Duration::from_secs(60))
            .build()
            .unwrap()
    })
}

async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
    info!("Processing {} users", event.payload.user_ids.len());

    // Process all users concurrently
    let user_futures = event.payload.user_ids
        .into_iter()
        .map(|user_id| fetch_user_data(user_id));

    let users = try_join_all(user_futures).await?;

    Ok(Response { users })
}

async fn fetch_user_data(user_id: String) -> Result<UserData, Error> {
    let client = get_client();

    // Fetch all user data concurrently
    let (user, posts, followers) = tokio::try_join!(
        fetch_user(client, &user_id),
        fetch_posts(client, &user_id),
        fetch_follower_count(client, &user_id),
    )?;

    Ok(UserData { user, posts, followers })
}

async fn fetch_user(client: &Client, user_id: &str) -> Result<User, Error> {
    let response = client
        .get(format!("https://api.example.com/users/{}", user_id))
        .send()
        .await?
        .json()
        .await?;
    Ok(response)
}

async fn fetch_posts(client: &Client, user_id: &str) -> Result<Vec<Post>, Error> {
    let response = client
        .get(format!("https://api.example.com/users/{}/posts", user_id))
        .send()
        .await?
        .json()
        .await?;
    Ok(response)
}

async fn fetch_follower_count(client: &Client, user_id: &str) -> Result<usize, Error> {
    let response: FollowerResponse = client
        .get(format!("https://api.example.com/users/{}/followers", user_id))
        .send()
        .await?
        .json()
        .await?;
    Ok(response.count)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}

AWS SDK Optimization

S3 Concurrent Operations

use aws_sdk_s3::Client;
use futures::future::try_join_all;

async fn download_multiple_files(
    s3: &Client,
    bucket: &str,
    keys: Vec<String>,
) -> Result<Vec<Bytes>, Error> {
    let futures = keys.iter().map(|key| async move {
        s3.get_object()
            .bucket(bucket)
            .key(key)
            .send()
            .await?
            .body
            .collect()
            .await
            .map(|data| data.into_bytes())
    });

    try_join_all(futures).await
}

DynamoDB Parallel Queries

async fn query_multiple_partitions(
    dynamodb: &Client,
    partition_keys: Vec<String>,
) -> Result<Vec<Item>, Error> {
    let futures = partition_keys.iter().map(|pk| {
        dynamodb
            .query()
            .table_name("MyTable")
            .key_condition_expression("pk = :pk")
            .expression_attribute_values(":pk", AttributeValue::S(pk.clone()))
            .send()
    });

    let results = try_join_all(futures).await?;

    let items = results
        .into_iter()
        .flat_map(|r| r.items.unwrap_or_default())
        .collect();

    Ok(items)
}

Timeout and Retry Configuration

use reqwest::Client;
use std::time::Duration;

fn get_client() -> &'static Client {
    HTTP_CLIENT.get_or_init(|| {
        Client::builder()
            .timeout(Duration::from_secs(30))      // Total request timeout
            .connect_timeout(Duration::from_secs(10)) // Connection timeout
            .pool_max_idle_per_host(10)            // Connection pool size
            .tcp_keepalive(Duration::from_secs(60)) // Keep connections alive
            .build()
            .unwrap()
    })
}

With retries:

use tower::{ServiceBuilder, ServiceExt};
use tower::retry::RetryLayer;

// Add to dependencies: tower = { version = "0.4", features = ["retry"] }

async fn fetch_with_retry(url: &str) -> Result<Response, Error> {
    let client = get_client();

    for attempt in 1..=3 {
        match client.get(url).send().await {
            Ok(response) => return Ok(response),
            Err(e) if attempt < 3 => {
                tokio::time::sleep(Duration::from_millis(100 * attempt)).await;
                continue;
            }
            Err(e) => return Err(e.into()),
        }
    }

    unreachable!()
}

Streaming Large Responses

For large files or responses:

use tokio::io::AsyncWriteExt;
use futures::StreamExt;

async fn download_large_file(s3: &Client, bucket: &str, key: &str) -> Result<(), Error> {
    let mut object = s3
        .get_object()
        .bucket(bucket)
        .key(key)
        .send()
        .await?;

    // Stream to avoid loading entire file in memory
    let mut body = object.body;

    while let Some(chunk) = body.next().await {
        let chunk = chunk?;
        // Process chunk
        process_chunk(&chunk).await?;
    }

    Ok(())
}

Concurrency Limits

Control concurrency to avoid overwhelming external services:

use futures::stream::{self, StreamExt};

async fn process_with_limit(
    items: Vec<Item>,
    max_concurrent: usize,
) -> Result<Vec<Output>, Error> {
    let results = stream::iter(items)
        .map(|item| async move {
            process_item(item).await
        })
        .buffer_unordered(max_concurrent)  // Limit concurrent operations
        .collect::<Vec<_>>()
        .await;

    results.into_iter().collect()
}

Error Handling for Async Operations

use thiserror::Error;

#[derive(Error, Debug)]
enum LambdaError {
    #[error("HTTP error: {0}")]
    Http(#[from] reqwest::Error),

    #[error("Database error: {0}")]
    Database(String),

    #[error("Timeout: operation took too long")]
    Timeout,
}

async fn function_handler(event: LambdaEvent<Request>) -> Result<Response, Error> {
    // Use timeout for async operations
    let result = tokio::time::timeout(
        Duration::from_secs(25),  // Lambda timeout - buffer
        process_request(event.payload)
    )
    .await
    .map_err(|_| LambdaError::Timeout)??;

    Ok(result)
}

Performance Checklist

Apply these optimizations:

  • Use tokio::try_join! for fixed concurrent operations
  • Use futures::future::try_join_all for dynamic collections
  • Initialize clients/pools once with OnceLock
  • Configure connection pooling for HTTP clients
  • Use batch APIs when available
  • Set appropriate timeouts
  • Add retries for transient failures
  • Stream large responses
  • Limit concurrency to avoid overwhelming services
  • Use buffer_unordered for controlled parallelism
  • Avoid blocking operations in async context
  • Monitor cold start times
  • Test with realistic event sizes

Dependencies for IO Optimization

Add to Cargo.toml:

[dependencies]
lambda_runtime = "0.13"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }
futures = "0.3"

# HTTP client
reqwest = { version = "0.12", features = ["json"] }

# AWS SDKs
aws-config = "1"
aws-sdk-s3 = "1"
aws-sdk-dynamodb = "1"

# Database (if needed)
sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres"] }

# Error handling
anyhow = "1"
thiserror = "1"

# Tracing
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

# Optional: retries
tower = { version = "0.4", features = ["retry", "timeout"] }

Testing IO Performance

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_concurrent_performance() {
        let start = std::time::Instant::now();

        let results = fetch_multiple_users(vec!["1", "2", "3"]).await.unwrap();

        let duration = start.elapsed();

        // Should be ~100ms (concurrent), not ~300ms (sequential)
        assert!(duration.as_millis() < 150);
        assert_eq!(results.len(), 3);
    }
}

Monitoring

Add instrumentation to track IO performance:

use tracing::{info, instrument};

#[instrument(skip(client))]
async fn fetch_user(client: &Client, user_id: &str) -> Result<User, Error> {
    let start = std::time::Instant::now();

    let result = client
        .get(format!("https://api.example.com/users/{}", user_id))
        .send()
        .await?
        .json()
        .await?;

    info!(user_id, duration_ms = start.elapsed().as_millis(), "User fetched");

    Ok(result)
}

After optimization, verify:

  • Cold start time (should be minimal)
  • Warm execution time (should be low due to concurrency)
  • Memory usage (should be moderate)
  • Error rates (should be low with retries)
  • CloudWatch metrics show improved performance