1132 lines
31 KiB
Markdown
1132 lines
31 KiB
Markdown
|
|
# Async Patterns and Concurrency
|
|
|
|
## Overview
|
|
|
|
**Core Principle:** Async code is about I/O concurrency, not CPU parallelism. Use async when waiting for network, files, or databases. Don't use async to speed up CPU-bound work.
|
|
|
|
Python's async/await (asyncio) enables single-threaded concurrency through cooperative multitasking. Structured concurrency (TaskGroup in 3.11+) makes async code safer and easier to reason about. The most common mistake: blocking the event loop with synchronous operations.
|
|
|
|
## When to Use
|
|
|
|
**Use this skill when:**
|
|
- "asyncio not working"
|
|
- "async/await errors"
|
|
- "Event loop issues"
|
|
- "Coroutine never awaited"
|
|
- "How to use TaskGroup?"
|
|
- "When to use async?"
|
|
- "Async code is slow"
|
|
- "Blocking the event loop"
|
|
|
|
**Don't use when:**
|
|
- CPU-bound work (use multiprocessing or threads)
|
|
- Setting up project (use project-structure-and-tooling)
|
|
- Profiling needed (use debugging-and-profiling first)
|
|
|
|
**Symptoms triggering this skill:**
|
|
- RuntimeWarning: coroutine was never awaited
|
|
- Event loop errors
|
|
- Async functions not running concurrently
|
|
- Need to parallelize I/O operations
|
|
|
|
|
|
## Async Fundamentals
|
|
|
|
### When to Use Async vs Sync
|
|
|
|
```python
|
|
# ❌ WRONG: Using async for CPU-bound work
|
|
async def calculate_fibonacci(n: int) -> int:
|
|
if n < 2:
|
|
return n
|
|
return await calculate_fibonacci(n-1) + await calculate_fibonacci(n-2)
|
|
# Problem: No I/O, just CPU work. Async adds overhead without benefit.
|
|
|
|
# ✅ CORRECT: Use regular function for CPU work
|
|
def calculate_fibonacci(n: int) -> int:
|
|
if n < 2:
|
|
return n
|
|
return calculate_fibonacci(n-1) + calculate_fibonacci(n-2)
|
|
|
|
# ✅ CORRECT: Use async for I/O-bound work
|
|
async def fetch_user(user_id: int) -> dict:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(f"https://api.example.com/users/{user_id}") as resp:
|
|
return await resp.json()
|
|
# Async shines: waiting for network response, can do other work
|
|
|
|
# ✅ CORRECT: Use async when orchestrating multiple I/O operations
|
|
async def fetch_all_users(user_ids: list[int]) -> list[dict]:
|
|
async with aiohttp.ClientSession() as session:
|
|
tasks = [fetch_user(session, uid) for uid in user_ids]
|
|
return await asyncio.gather(*tasks)
|
|
# Multiple network calls run concurrently
|
|
```
|
|
|
|
**Why this matters**: Async adds complexity. Only use when you benefit from I/O concurrency. For CPU work, use threads or multiprocessing.
|
|
|
|
### Basic async/await Syntax
|
|
|
|
```python
|
|
# ❌ WRONG: Forgetting await
|
|
async def get_data():
|
|
return fetch_from_api() # Returns coroutine, doesn't execute!
|
|
|
|
result = get_data() # RuntimeWarning: coroutine never awaited
|
|
print(result) # Prints <coroutine object>, not data
|
|
|
|
# ✅ CORRECT: Always await async functions
|
|
async def get_data():
|
|
return await fetch_from_api()
|
|
|
|
# ✅ CORRECT: Running from sync code
|
|
import asyncio
|
|
|
|
def main():
|
|
result = asyncio.run(get_data())
|
|
print(result)
|
|
|
|
# ✅ CORRECT: Running from async code
|
|
async def main():
|
|
result = await get_data()
|
|
print(result)
|
|
|
|
asyncio.run(main())
|
|
```
|
|
|
|
**Why this matters**: Async functions return coroutines. Must `await` them to execute. `asyncio.run()` bridges sync and async worlds.
|
|
|
|
### Running the Event Loop
|
|
|
|
```python
|
|
# ❌ WRONG: Running event loop multiple times
|
|
import asyncio
|
|
|
|
asyncio.run(task1())
|
|
asyncio.run(task2()) # Creates new event loop, inefficient
|
|
|
|
# ✅ CORRECT: Single event loop for all async work
|
|
async def main():
|
|
await task1()
|
|
await task2()
|
|
|
|
asyncio.run(main())
|
|
|
|
# ❌ WRONG: Mixing asyncio.run and manual loop management
|
|
loop = asyncio.get_event_loop()
|
|
loop.run_until_complete(task1())
|
|
asyncio.run(task2()) # Error: loop already running
|
|
|
|
# ✅ CORRECT: Use asyncio.run() (Python 3.7+)
|
|
asyncio.run(main())
|
|
|
|
# ✅ CORRECT: For advanced cases, manual loop management
|
|
async def main():
|
|
await task1()
|
|
await task2()
|
|
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
loop.run_until_complete(main())
|
|
finally:
|
|
loop.close()
|
|
```
|
|
|
|
**Why this matters**: `asyncio.run()` handles loop creation and cleanup automatically. Prefer it unless you need fine-grained control.
|
|
|
|
|
|
## Structured Concurrency with TaskGroup (Python 3.11+)
|
|
|
|
### TaskGroup Basics
|
|
|
|
```python
|
|
# ❌ WRONG: Creating tasks without proper cleanup (old style)
|
|
async def fetch_all_old(urls: list[str]) -> list[str]:
|
|
tasks = []
|
|
for url in urls:
|
|
task = asyncio.create_task(fetch(url))
|
|
tasks.append(task)
|
|
|
|
results = await asyncio.gather(*tasks)
|
|
return results
|
|
# Problem: If one task fails, others continue. No automatic cleanup.
|
|
|
|
# ✅ CORRECT: TaskGroup (Python 3.11+)
|
|
async def fetch_all(urls: list[str]) -> list[str]:
|
|
async with asyncio.TaskGroup() as tg:
|
|
tasks = [tg.create_task(fetch(url)) for url in urls]
|
|
|
|
# When exiting context, all tasks guaranteed complete or cancelled
|
|
return [task.result() for task in tasks]
|
|
|
|
# Why this matters: TaskGroup ensures:
|
|
# 1. All tasks complete before proceeding
|
|
# 2. If any task fails, all others cancelled
|
|
# 3. Automatic cleanup, no leaked tasks
|
|
```
|
|
|
|
### Handling Errors with TaskGroup
|
|
|
|
```python
|
|
# ❌ WRONG: Silent failures with gather
|
|
async def process_all_gather(items: list[str]) -> list[str]:
|
|
tasks = [asyncio.create_task(process(item)) for item in items]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
return [r for r in results if not isinstance(r, Exception)]
|
|
# Problem: Errors silently ignored, hard to debug
|
|
|
|
# ✅ CORRECT: TaskGroup raises ExceptionGroup
|
|
async def process_all(items: list[str]) -> list[str]:
|
|
async with asyncio.TaskGroup() as tg:
|
|
tasks = [tg.create_task(process(item)) for item in items]
|
|
return [task.result() for task in tasks]
|
|
|
|
# Usage with error handling
|
|
try:
|
|
results = await process_all(items)
|
|
except* ValueError as eg:
|
|
# Handle all ValueErrors
|
|
for exc in eg.exceptions:
|
|
log.error(f"Validation error: {exc}")
|
|
except* ConnectionError as eg:
|
|
# Handle all ConnectionErrors
|
|
for exc in eg.exceptions:
|
|
log.error(f"Network error: {exc}")
|
|
|
|
# ✅ CORRECT: Selective error handling with gather
|
|
async def process_with_fallback(items: list[str]) -> list[str]:
|
|
tasks = [asyncio.create_task(process(item)) for item in items]
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
processed = []
|
|
for item, result in zip(items, results):
|
|
if isinstance(result, Exception):
|
|
log.warning(f"Failed to process {item}: {result}")
|
|
processed.append(None) # Or default value
|
|
else:
|
|
processed.append(result)
|
|
return processed
|
|
```
|
|
|
|
**Why this matters**: TaskGroup provides structured concurrency with automatic cleanup. Use `gather` when you need partial results despite failures.
|
|
|
|
### Timeout Handling
|
|
|
|
```python
|
|
# ❌ WRONG: No timeout on I/O operations
|
|
async def fetch_data(url: str) -> str:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
return await resp.text()
|
|
# Problem: Can hang forever if server doesn't respond
|
|
|
|
# ✅ CORRECT: Timeout with asyncio.timeout (Python 3.11+)
|
|
async def fetch_data(url: str) -> str:
|
|
async with asyncio.timeout(10.0): # 10 second timeout
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
return await resp.text()
|
|
# Raises TimeoutError after 10 seconds
|
|
|
|
# ✅ CORRECT: Timeout on TaskGroup
|
|
async def fetch_all_with_timeout(urls: list[str]) -> list[str]:
|
|
async with asyncio.timeout(30.0): # Total timeout
|
|
async with asyncio.TaskGroup() as tg:
|
|
tasks = [tg.create_task(fetch_data(url)) for url in urls]
|
|
return [task.result() for task in tasks]
|
|
|
|
# ✅ CORRECT: Individual timeouts (Python <3.11)
|
|
async def fetch_with_timeout_old(url: str) -> str:
|
|
try:
|
|
return await asyncio.wait_for(fetch_data(url), timeout=10.0)
|
|
except asyncio.TimeoutError:
|
|
log.error(f"Timeout fetching {url}")
|
|
raise
|
|
```
|
|
|
|
**Why this matters**: Always timeout I/O operations. Network calls can hang indefinitely. `asyncio.timeout()` (3.11+) is cleaner than `wait_for()`.
|
|
|
|
|
|
## Async Context Managers
|
|
|
|
### Basic Async Context Manager
|
|
|
|
```python
|
|
# ❌ WRONG: Using sync context manager in async code
|
|
class DatabaseConnection:
|
|
def __enter__(self):
|
|
self.conn = connect_to_db() # Blocking I/O!
|
|
return self.conn
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.conn.close() # Blocking I/O!
|
|
|
|
async def query():
|
|
with DatabaseConnection() as conn: # Blocks event loop
|
|
return await conn.query("SELECT * FROM users")
|
|
|
|
# ✅ CORRECT: Async context manager
|
|
class AsyncDatabaseConnection:
|
|
async def __aenter__(self):
|
|
self.conn = await async_connect_to_db()
|
|
return self.conn
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
await self.conn.close()
|
|
|
|
async def query():
|
|
async with AsyncDatabaseConnection() as conn:
|
|
return await conn.query("SELECT * FROM users")
|
|
```
|
|
|
|
### Using contextlib for Async Context Managers
|
|
|
|
```python
|
|
from contextlib import asynccontextmanager
|
|
|
|
# ✅ CORRECT: Simple async context manager with decorator
|
|
@asynccontextmanager
|
|
async def database_connection(host: str):
|
|
conn = await connect_to_database(host)
|
|
try:
|
|
yield conn
|
|
finally:
|
|
await conn.close()
|
|
|
|
# Usage
|
|
async def fetch_users():
|
|
async with database_connection("localhost") as conn:
|
|
return await conn.query("SELECT * FROM users")
|
|
|
|
# ✅ CORRECT: Resource pool management
|
|
@asynccontextmanager
|
|
async def http_session():
|
|
session = aiohttp.ClientSession()
|
|
try:
|
|
yield session
|
|
finally:
|
|
await session.close()
|
|
|
|
async def fetch_multiple(urls: list[str]):
|
|
async with http_session() as session:
|
|
tasks = [fetch_url(session, url) for url in urls]
|
|
return await asyncio.gather(*tasks)
|
|
```
|
|
|
|
**Why this matters**: Async context managers ensure resources cleaned up properly. Use `@asynccontextmanager` for simple cases, `__aenter__/__aexit__` for complex ones.
|
|
|
|
|
|
## Async Iterators and Generators
|
|
|
|
### Async Iterators
|
|
|
|
```python
|
|
# ❌ WRONG: Sync iterator doing async work
|
|
class DataFetcher:
|
|
def __init__(self, ids: list[int]):
|
|
self.ids = ids
|
|
self.index = 0
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def __next__(self):
|
|
if self.index >= len(self.ids):
|
|
raise StopIteration
|
|
data = asyncio.run(fetch_data(self.ids[self.index])) # Don't do this!
|
|
self.index += 1
|
|
return data
|
|
|
|
# ✅ CORRECT: Async iterator
|
|
class AsyncDataFetcher:
|
|
def __init__(self, ids: list[int]):
|
|
self.ids = ids
|
|
self.index = 0
|
|
|
|
def __aiter__(self):
|
|
return self
|
|
|
|
async def __anext__(self):
|
|
if self.index >= len(self.ids):
|
|
raise StopAsyncIteration
|
|
data = await fetch_data(self.ids[self.index])
|
|
self.index += 1
|
|
return data
|
|
|
|
# Usage
|
|
async def process_all():
|
|
async for data in AsyncDataFetcher([1, 2, 3, 4]):
|
|
print(data)
|
|
```
|
|
|
|
### Async Generators
|
|
|
|
```python
|
|
# ✅ CORRECT: Async generator (simpler than iterator)
|
|
async def fetch_users_paginated(page_size: int = 100):
|
|
page = 0
|
|
while True:
|
|
users = await fetch_page(page, page_size)
|
|
if not users:
|
|
break
|
|
for user in users:
|
|
yield user
|
|
page += 1
|
|
|
|
# Usage
|
|
async def process_all_users():
|
|
async for user in fetch_users_paginated():
|
|
await process_user(user)
|
|
|
|
# ✅ CORRECT: Async generator with cleanup
|
|
async def stream_file_lines(path: str):
|
|
async with aiofiles.open(path) as f:
|
|
async for line in f:
|
|
yield line.strip()
|
|
|
|
# Usage with async comprehension
|
|
async def load_data(path: str) -> list[str]:
|
|
return [line async for line in stream_file_lines(path)]
|
|
```
|
|
|
|
**Why this matters**: Async iterators/generators enable streaming I/O-bound data without loading everything into memory. Essential for large datasets.
|
|
|
|
|
|
## Common Async Pitfalls
|
|
|
|
### Blocking the Event Loop
|
|
|
|
```python
|
|
# ❌ WRONG: Blocking operation in async function
|
|
import time
|
|
import requests
|
|
|
|
async def fetch_data(url: str) -> str:
|
|
# Blocks entire event loop for 2 seconds!
|
|
time.sleep(2)
|
|
|
|
# Also blocks event loop (requests is synchronous)
|
|
response = requests.get(url)
|
|
return response.text
|
|
|
|
# ✅ CORRECT: Use async sleep and async HTTP
|
|
import asyncio
|
|
import aiohttp
|
|
|
|
async def fetch_data(url: str) -> str:
|
|
await asyncio.sleep(2) # Non-blocking sleep
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
return await resp.text()
|
|
|
|
# ✅ CORRECT: If must use blocking code, run in executor
|
|
import asyncio
|
|
import requests
|
|
|
|
async def fetch_data_sync(url: str) -> str:
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Run blocking code in thread pool
|
|
response = await loop.run_in_executor(
|
|
None, # Use default executor
|
|
requests.get,
|
|
url
|
|
)
|
|
return response.text
|
|
|
|
# ✅ CORRECT: CPU-bound work in process pool
|
|
async def heavy_computation(data: bytes) -> bytes:
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Run in process pool for CPU work
|
|
with concurrent.futures.ProcessPoolExecutor() as pool:
|
|
result = await loop.run_in_executor(pool, process_data, data)
|
|
return result
|
|
```
|
|
|
|
**Why this matters**: Blocking the event loop stops ALL async code. Use async libraries (aiohttp not requests), async sleep, or run_in_executor for blocking code.
|
|
|
|
### Forgetting to Await
|
|
|
|
```python
|
|
# ❌ WRONG: Not awaiting async functions
|
|
async def main():
|
|
fetch_data() # Returns coroutine, doesn't run!
|
|
print("Done")
|
|
|
|
# ✅ CORRECT: Always await
|
|
async def main():
|
|
await fetch_data()
|
|
print("Done")
|
|
|
|
# ❌ WRONG: Collecting coroutines without running them
|
|
async def process_all(items: list[str]):
|
|
results = [process_item(item) for item in items] # List of coroutines!
|
|
return results
|
|
|
|
# ✅ CORRECT: Await or gather
|
|
async def process_all(items: list[str]):
|
|
tasks = [asyncio.create_task(process_item(item)) for item in items]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
# ✅ BETTER: TaskGroup (Python 3.11+)
|
|
async def process_all(items: list[str]):
|
|
async with asyncio.TaskGroup() as tg:
|
|
tasks = [tg.create_task(process_item(item)) for item in items]
|
|
return [task.result() for task in tasks]
|
|
```
|
|
|
|
### Shared Mutable State
|
|
|
|
```python
|
|
# ❌ WRONG: Shared mutable state without locks
|
|
counter = 0
|
|
|
|
async def increment():
|
|
global counter
|
|
temp = counter
|
|
await asyncio.sleep(0) # Yield control
|
|
counter = temp + 1 # Race condition!
|
|
|
|
async def main():
|
|
await asyncio.gather(*[increment() for _ in range(100)])
|
|
print(counter) # Not 100! Lost updates due to race
|
|
|
|
# ✅ CORRECT: Use asyncio.Lock
|
|
counter = 0
|
|
lock = asyncio.Lock()
|
|
|
|
async def increment():
|
|
global counter
|
|
async with lock:
|
|
temp = counter
|
|
await asyncio.sleep(0)
|
|
counter = temp + 1
|
|
|
|
async def main():
|
|
await asyncio.gather(*[increment() for _ in range(100)])
|
|
print(counter) # 100, as expected
|
|
|
|
# ✅ BETTER: Avoid shared state
|
|
async def increment(current: int) -> int:
|
|
await asyncio.sleep(0)
|
|
return current + 1
|
|
|
|
async def main():
|
|
results = await asyncio.gather(*[increment(i) for i in range(100)])
|
|
print(sum(results))
|
|
```
|
|
|
|
**Why this matters**: Async code is concurrent. Race conditions exist. Use locks or avoid shared mutable state.
|
|
|
|
|
|
## Async Patterns
|
|
|
|
### Fire and Forget
|
|
|
|
```python
|
|
# ❌ WRONG: Creating task without tracking it
|
|
async def main():
|
|
asyncio.create_task(background_job()) # Task may not complete!
|
|
return "Done"
|
|
|
|
# ✅ CORRECT: Track background tasks
|
|
background_tasks = set()
|
|
|
|
async def main():
|
|
task = asyncio.create_task(background_job())
|
|
background_tasks.add(task)
|
|
task.add_done_callback(background_tasks.discard)
|
|
return "Done"
|
|
|
|
# ✅ CORRECT: Wait for background tasks before exit
|
|
async def main():
|
|
task = asyncio.create_task(background_job())
|
|
try:
|
|
return "Done"
|
|
finally:
|
|
await task
|
|
```
|
|
|
|
### Retry with Exponential Backoff
|
|
|
|
```python
|
|
# ❌ WRONG: Retry without delay
|
|
async def fetch_with_retry(url: str, max_retries: int = 3) -> str:
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await fetch_data(url)
|
|
except Exception:
|
|
if attempt == max_retries - 1:
|
|
raise
|
|
# Hammers server, no backoff
|
|
|
|
# ✅ CORRECT: Exponential backoff with jitter
|
|
async def fetch_with_retry(
|
|
url: str,
|
|
max_retries: int = 3,
|
|
base_delay: float = 1.0
|
|
) -> str:
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await fetch_data(url)
|
|
except Exception as e:
|
|
if attempt == max_retries - 1:
|
|
raise
|
|
|
|
# Exponential backoff with jitter
|
|
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
|
|
log.warning(f"Retry {attempt + 1}/{max_retries} after {delay:.2f}s: {e}")
|
|
await asyncio.sleep(delay)
|
|
|
|
raise RuntimeError("Unreachable")
|
|
```
|
|
|
|
### Rate Limiting
|
|
|
|
```python
|
|
# ❌ WRONG: No rate limiting
|
|
async def fetch_all(urls: list[str]) -> list[str]:
|
|
tasks = [asyncio.create_task(fetch(url)) for url in urls]
|
|
return await asyncio.gather(*tasks)
|
|
# Can overwhelm server with 1000s of concurrent requests
|
|
|
|
# ✅ CORRECT: Semaphore for concurrent request limit
|
|
async def fetch_all(urls: list[str], max_concurrent: int = 10) -> list[str]:
|
|
semaphore = asyncio.Semaphore(max_concurrent)
|
|
|
|
async def fetch_with_sem(url: str) -> str:
|
|
async with semaphore:
|
|
return await fetch(url)
|
|
|
|
tasks = [asyncio.create_task(fetch_with_sem(url)) for url in urls]
|
|
return await asyncio.gather(*tasks)
|
|
|
|
# ✅ CORRECT: Token bucket rate limiting
|
|
class RateLimiter:
|
|
def __init__(self, rate: float, capacity: int):
|
|
self.rate = rate # Tokens per second
|
|
self.capacity = capacity
|
|
self.tokens = capacity
|
|
self.last_update = asyncio.get_event_loop().time()
|
|
self.lock = asyncio.Lock()
|
|
|
|
async def acquire(self):
|
|
async with self.lock:
|
|
now = asyncio.get_event_loop().time()
|
|
elapsed = now - self.last_update
|
|
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
|
|
self.last_update = now
|
|
|
|
if self.tokens < 1:
|
|
wait_time = (1 - self.tokens) / self.rate
|
|
await asyncio.sleep(wait_time)
|
|
self.tokens = 0
|
|
else:
|
|
self.tokens -= 1
|
|
|
|
# Usage
|
|
rate_limiter = RateLimiter(rate=10.0, capacity=10) # 10 req/sec
|
|
|
|
async def fetch_with_limit(url: str) -> str:
|
|
await rate_limiter.acquire()
|
|
return await fetch(url)
|
|
```
|
|
|
|
**Why this matters**: Rate limiting prevents overwhelming servers and respects API limits. Semaphore limits concurrency, token bucket smooths bursts.
|
|
|
|
### Async Queue for Producer/Consumer
|
|
|
|
```python
|
|
# ✅ CORRECT: Producer/consumer with asyncio.Queue
|
|
import asyncio
|
|
|
|
async def producer(queue: asyncio.Queue, items: list[str]):
|
|
for item in items:
|
|
await queue.put(item)
|
|
await asyncio.sleep(0.1) # Simulate work
|
|
|
|
# Signal completion
|
|
await queue.put(None)
|
|
|
|
async def consumer(queue: asyncio.Queue, consumer_id: int):
|
|
while True:
|
|
item = await queue.get()
|
|
|
|
if item is None:
|
|
# Re-queue sentinel for other consumers
|
|
await queue.put(None)
|
|
break
|
|
|
|
print(f"Consumer {consumer_id} processing {item}")
|
|
await asyncio.sleep(0.2) # Simulate work
|
|
queue.task_done()
|
|
|
|
async def main():
|
|
queue = asyncio.Queue(maxsize=10)
|
|
items = [f"item_{i}" for i in range(20)]
|
|
|
|
# Start producer and consumers
|
|
async with asyncio.TaskGroup() as tg:
|
|
tg.create_task(producer(queue, items))
|
|
for i in range(3):
|
|
tg.create_task(consumer(queue, i))
|
|
|
|
# Wait for all items processed
|
|
await queue.join()
|
|
|
|
# ✅ CORRECT: Multiple producers, multiple consumers
|
|
async def worker(name: str, queue: asyncio.Queue):
|
|
while True:
|
|
item = await queue.get()
|
|
if item is None:
|
|
break
|
|
|
|
await process_item(item)
|
|
queue.task_done()
|
|
|
|
async def main():
|
|
queue = asyncio.Queue()
|
|
|
|
# Create workers
|
|
workers = [asyncio.create_task(worker(f"worker-{i}", queue)) for i in range(5)]
|
|
|
|
# Add work
|
|
for item in items:
|
|
await queue.put(item)
|
|
|
|
# Wait for all work done
|
|
await queue.join()
|
|
|
|
# Stop workers
|
|
for _ in workers:
|
|
await queue.put(None)
|
|
await asyncio.gather(*workers)
|
|
```
|
|
|
|
**Why this matters**: asyncio.Queue is thread-safe and async-safe. Perfect for producer/consumer patterns in async code.
|
|
|
|
|
|
## Threading vs Async vs Multiprocessing
|
|
|
|
### When to Use What
|
|
|
|
```python
|
|
# CPU-bound work: Use multiprocessing
|
|
def cpu_bound(n: int) -> int:
|
|
return sum(i * i for i in range(n))
|
|
|
|
async def process_cpu_tasks(data: list[int]) -> list[int]:
|
|
loop = asyncio.get_running_loop()
|
|
with concurrent.futures.ProcessPoolExecutor() as pool:
|
|
results = await asyncio.gather(*[
|
|
loop.run_in_executor(pool, cpu_bound, n) for n in data
|
|
])
|
|
return results
|
|
|
|
# I/O-bound work: Use async
|
|
async def io_bound(url: str) -> str:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
return await resp.text()
|
|
|
|
async def process_io_tasks(urls: list[str]) -> list[str]:
|
|
return await asyncio.gather(*[io_bound(url) for url in urls])
|
|
|
|
# Blocking I/O (no async library): Use threads
|
|
def blocking_io(path: str) -> str:
|
|
with open(path) as f: # Blocking file I/O
|
|
return f.read()
|
|
|
|
async def process_files(paths: list[str]) -> list[str]:
|
|
loop = asyncio.get_running_loop()
|
|
with concurrent.futures.ThreadPoolExecutor() as pool:
|
|
results = await asyncio.gather(*[
|
|
loop.run_in_executor(pool, blocking_io, path) for path in paths
|
|
])
|
|
return results
|
|
```
|
|
|
|
**Decision tree:**
|
|
```
|
|
Is work CPU-bound?
|
|
├─ Yes → multiprocessing (ProcessPoolExecutor)
|
|
└─ No → I/O-bound
|
|
├─ Async library available? → async/await
|
|
└─ Only sync library? → threads (ThreadPoolExecutor)
|
|
```
|
|
|
|
### Combining Async and Threads
|
|
|
|
```python
|
|
# ✅ CORRECT: Running async code in thread
|
|
import threading
|
|
|
|
def run_async_in_thread(coro):
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
return loop.run_until_complete(coro)
|
|
finally:
|
|
loop.close()
|
|
|
|
def sync_function():
|
|
result = run_async_in_thread(async_operation())
|
|
return result
|
|
|
|
# ✅ CORRECT: Thread-safe async queue
|
|
class AsyncThreadSafeQueue:
|
|
def __init__(self):
|
|
self._queue = queue.Queue()
|
|
|
|
async def get(self):
|
|
loop = asyncio.get_running_loop()
|
|
return await loop.run_in_executor(None, self._queue.get)
|
|
|
|
async def put(self, item):
|
|
loop = asyncio.get_running_loop()
|
|
await loop.run_in_executor(None, self._queue.put, item)
|
|
```
|
|
|
|
|
|
## Debugging Async Code
|
|
|
|
### Common Errors and Solutions
|
|
|
|
```python
|
|
# Error: RuntimeWarning: coroutine 'fetch' was never awaited
|
|
# ❌ WRONG:
|
|
async def main():
|
|
fetch_data() # Missing await
|
|
|
|
# ✅ CORRECT:
|
|
async def main():
|
|
await fetch_data()
|
|
|
|
# Error: RuntimeError: Event loop is closed
|
|
# ❌ WRONG:
|
|
asyncio.run(coro1())
|
|
asyncio.run(coro2()) # Creates new loop, first loop closed
|
|
|
|
# ✅ CORRECT:
|
|
async def main():
|
|
await coro1()
|
|
await coro2()
|
|
asyncio.run(main())
|
|
|
|
# Error: RuntimeError: Task got Future attached to different loop
|
|
# ❌ WRONG:
|
|
loop1 = asyncio.new_event_loop()
|
|
task = loop1.create_task(coro())
|
|
loop2 = asyncio.new_event_loop()
|
|
loop2.run_until_complete(task) # Task from different loop!
|
|
|
|
# ✅ CORRECT: Use same loop
|
|
loop = asyncio.new_event_loop()
|
|
task = loop.create_task(coro())
|
|
loop.run_until_complete(task)
|
|
```
|
|
|
|
### Enabling Debug Mode
|
|
|
|
```python
|
|
# Enable asyncio debug mode for better errors
|
|
import asyncio
|
|
import logging
|
|
|
|
# Method 1: Environment variable
|
|
# PYTHONASYNCIODEBUG=1 python script.py
|
|
|
|
# Method 2: In code
|
|
asyncio.run(main(), debug=True)
|
|
|
|
# Method 3: For existing loop
|
|
loop = asyncio.get_event_loop()
|
|
loop.set_debug(True)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
# Debug mode enables:
|
|
# - Warnings for slow callbacks (>100ms)
|
|
# - Warnings for coroutines never awaited
|
|
# - Better stack traces
|
|
```
|
|
|
|
### Detecting Blocking Code
|
|
|
|
```python
|
|
# ✅ CORRECT: Monitor event loop lag
|
|
import asyncio
|
|
import time
|
|
|
|
class LoopMonitor:
|
|
def __init__(self, threshold: float = 0.1):
|
|
self.threshold = threshold
|
|
self.last_check = time.monotonic()
|
|
|
|
async def monitor(self):
|
|
while True:
|
|
now = time.monotonic()
|
|
lag = now - self.last_check - 1.0 # Expecting 1 second sleep
|
|
|
|
if lag > self.threshold:
|
|
log.warning(f"Event loop blocked for {lag:.3f}s")
|
|
|
|
self.last_check = now
|
|
await asyncio.sleep(1.0)
|
|
|
|
async def main():
|
|
monitor = LoopMonitor()
|
|
asyncio.create_task(monitor.monitor())
|
|
|
|
# Your async code here
|
|
await run_application()
|
|
```
|
|
|
|
|
|
## Async Libraries Ecosystem
|
|
|
|
### Essential Async Libraries
|
|
|
|
```python
|
|
# HTTP client
|
|
import aiohttp
|
|
|
|
async def fetch(url: str) -> str:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url) as resp:
|
|
return await resp.text()
|
|
|
|
# File I/O
|
|
import aiofiles
|
|
|
|
async def read_file(path: str) -> str:
|
|
async with aiofiles.open(path) as f:
|
|
return await f.read()
|
|
|
|
# Database (PostgreSQL)
|
|
import asyncpg
|
|
|
|
async def query_db():
|
|
conn = await asyncpg.connect('postgresql://user@localhost/db')
|
|
try:
|
|
rows = await conn.fetch('SELECT * FROM users')
|
|
return rows
|
|
finally:
|
|
await conn.close()
|
|
|
|
# Redis
|
|
import aioredis
|
|
|
|
async def cache_get(key: str) -> str | None:
|
|
redis = await aioredis.create_redis_pool('redis://localhost')
|
|
try:
|
|
value = await redis.get(key)
|
|
return value.decode() if value else None
|
|
finally:
|
|
redis.close()
|
|
await redis.wait_closed()
|
|
```
|
|
|
|
### Async Testing with pytest-asyncio
|
|
|
|
```python
|
|
# Install: pip install pytest-asyncio
|
|
|
|
import pytest
|
|
|
|
# Mark async test
|
|
@pytest.mark.asyncio
|
|
async def test_fetch_data():
|
|
result = await fetch_data("https://api.example.com")
|
|
assert result is not None
|
|
|
|
# Async fixture
|
|
@pytest.fixture
|
|
async def http_session():
|
|
async with aiohttp.ClientSession() as session:
|
|
yield session
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_with_session(http_session):
|
|
async with http_session.get("https://api.example.com") as resp:
|
|
assert resp.status == 200
|
|
```
|
|
|
|
|
|
## Anti-Patterns
|
|
|
|
### Async Over Everything
|
|
|
|
```python
|
|
# ❌ WRONG: Making everything async without reason
|
|
async def calculate_total(prices: list[float]) -> float:
|
|
total = 0.0
|
|
for price in prices:
|
|
total += price # No I/O, no benefit from async
|
|
return total
|
|
|
|
# ✅ CORRECT: Keep sync when no I/O
|
|
def calculate_total(prices: list[float]) -> float:
|
|
return sum(prices)
|
|
|
|
# ❌ WRONG: Async wrapper for sync function
|
|
async def async_sum(numbers: list[int]) -> int:
|
|
return sum(numbers) # Why?
|
|
|
|
# ✅ CORRECT: Only async when doing I/O
|
|
async def fetch_and_sum(urls: list[str]) -> int:
|
|
results = await asyncio.gather(*[fetch_number(url) for url in urls])
|
|
return sum(results) # sum() is sync, that's fine
|
|
```
|
|
|
|
### Creating Too Many Tasks
|
|
|
|
```python
|
|
# ❌ WRONG: Creating millions of tasks
|
|
async def process_all(items: list[str]): # 1M items
|
|
tasks = [asyncio.create_task(process(item)) for item in items]
|
|
return await asyncio.gather(*tasks)
|
|
# Problem: Creates 1M tasks, high memory usage
|
|
|
|
# ✅ CORRECT: Batch processing with semaphore
|
|
async def process_all(items: list[str], max_concurrent: int = 100):
|
|
semaphore = asyncio.Semaphore(max_concurrent)
|
|
|
|
async def process_with_sem(item: str):
|
|
async with semaphore:
|
|
return await process(item)
|
|
|
|
return await asyncio.gather(*[process_with_sem(item) for item in items])
|
|
|
|
# ✅ BETTER: Process in batches
|
|
async def process_all(items: list[str], batch_size: int = 100):
|
|
results = []
|
|
for i in range(0, len(items), batch_size):
|
|
batch = items[i:i + batch_size]
|
|
batch_results = await asyncio.gather(*[process(item) for item in batch])
|
|
results.extend(batch_results)
|
|
return results
|
|
```
|
|
|
|
### Mixing Sync and Async Poorly
|
|
|
|
```python
|
|
# ❌ WRONG: Calling asyncio.run inside async function
|
|
async def bad_function():
|
|
result = asyncio.run(some_async_function()) # Error!
|
|
return result
|
|
|
|
# ✅ CORRECT: Just await
|
|
async def good_function():
|
|
result = await some_async_function()
|
|
return result
|
|
|
|
# ❌ WRONG: Sync wrapper calling async repeatedly
|
|
def process_all_sync(items: list[str]) -> list[str]:
|
|
return [asyncio.run(process(item)) for item in items]
|
|
# Creates new event loop for each item!
|
|
|
|
# ✅ CORRECT: Single event loop
|
|
def process_all_sync(items: list[str]) -> list[str]:
|
|
async def process_all_async():
|
|
return await asyncio.gather(*[process(item) for item in items])
|
|
|
|
return asyncio.run(process_all_async())
|
|
```
|
|
|
|
|
|
## Decision Trees
|
|
|
|
### Should I Use Async?
|
|
|
|
```
|
|
Does my code do I/O? (network, files, database)
|
|
├─ No → Don't use async (CPU-bound work)
|
|
└─ Yes → Does an async library exist?
|
|
├─ Yes → Use async/await
|
|
└─ No → Can I use sync library with threads?
|
|
├─ Yes → Use run_in_executor with ThreadPoolExecutor
|
|
└─ No → Rethink approach or write async wrapper
|
|
```
|
|
|
|
### Concurrent Execution Strategy
|
|
|
|
```
|
|
What am I waiting for?
|
|
├─ Network/database → async/await (asyncio)
|
|
├─ File I/O → async/await with aiofiles
|
|
├─ CPU computation → multiprocessing (ProcessPoolExecutor)
|
|
├─ Blocking library (no async version) → threads (ThreadPoolExecutor)
|
|
└─ Nothing (pure computation) → Regular sync code
|
|
```
|
|
|
|
### Error Handling in Concurrent Tasks
|
|
|
|
```
|
|
Do I need all results?
|
|
├─ Yes → TaskGroup (3.11+) or gather without return_exceptions
|
|
│ └─ Fails fast on first error
|
|
└─ No (partial results OK) → gather with return_exceptions=True
|
|
└─ Filter exceptions from results
|
|
```
|
|
|
|
|
|
## Integration with Other Skills
|
|
|
|
**After using this skill:**
|
|
- If profiling async code → See @debugging-and-profiling for async profiling
|
|
- If testing async code → See @testing-and-quality for pytest-asyncio
|
|
- If setting up project → See @project-structure-and-tooling for async dependencies
|
|
|
|
**Before using this skill:**
|
|
- If code is slow → Use @debugging-and-profiling to verify it's I/O-bound first
|
|
- If starting project → Use @project-structure-and-tooling to set up dependencies
|
|
|
|
|
|
## Quick Reference
|
|
|
|
### Python 3.11+ Features
|
|
|
|
| Feature | Description | When to Use |
|
|
|---------|-------------|-------------|
|
|
| TaskGroup | Structured concurrency | Multiple concurrent tasks, automatic cleanup |
|
|
| asyncio.timeout() | Context manager for timeouts | Cleaner than wait_for() |
|
|
| except* | Exception group handling | Handle multiple concurrent errors |
|
|
|
|
### Common Async Patterns
|
|
|
|
```python
|
|
# Concurrent execution
|
|
async with asyncio.TaskGroup() as tg:
|
|
tasks = [tg.create_task(func(x)) for x in items]
|
|
results = [t.result() for t in tasks]
|
|
|
|
# Timeout
|
|
async with asyncio.timeout(10.0):
|
|
result = await long_operation()
|
|
|
|
# Rate limiting
|
|
semaphore = asyncio.Semaphore(10)
|
|
async with semaphore:
|
|
await rate_limited_operation()
|
|
|
|
# Retry with backoff
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return await operation()
|
|
except Exception:
|
|
await asyncio.sleep(2 ** attempt)
|
|
```
|
|
|
|
### When NOT to Use Async
|
|
|
|
- Pure computation (no I/O)
|
|
- Single I/O operation (overhead not worth it)
|
|
- CPU-bound work (use multiprocessing)
|
|
- When sync code is simpler and performance is acceptable
|