Asyncio Deep Dive: Mastering Asynchronous Python¶
AI Assisted (Claude Sonnet 4)
The Fundamental Problem: Why Asyncio Exists¶
Traditional Python code executes synchronously - one operation at a time, waiting for each to complete before starting the next. This creates a critical inefficiency problem:
import time
import requests
def sequential_downloads():
"""Traditional synchronous approach"""
print("🐌 Starting sequential downloads...")
start = time.time()
# Each request blocks until complete
response1 = requests.get("https://httpbin.org/delay/2") # Wait 2 seconds
print(f"✅ First download complete")
response2 = requests.get("https://httpbin.org/delay/2") # Wait another 2 seconds
print(f"✅ Second download complete")
response3 = requests.get("https://httpbin.org/delay/2") # Wait another 2 seconds
print(f"✅ Third download complete")
total_time = time.time() - start
print(f"⏱️ Total time: {total_time:.2f} seconds") # ~6 seconds
return [response1, response2, response3]
# sequential_downloads() # Uncomment to see the slow approach
The inefficiency: While waiting for the first request, your CPU sits idle even though it could be starting the second and third requests simultaneously.
Asyncio's solution: Enable cooperative multitasking where your program can juggle multiple operations, switching between them while waiting for I/O:
import asyncio
import aiohttp
import time
async def concurrent_downloads():
"""Asyncio's concurrent approach"""
print("🚀 Starting concurrent downloads...")
start = time.time()
async with aiohttp.ClientSession() as session:
# Start all three requests simultaneously
tasks = [
session.get("https://httpbin.org/delay/2"),
session.get("https://httpbin.org/delay/2"),
session.get("https://httpbin.org/delay/2")
]
# Wait for all to complete
responses = await asyncio.gather(*tasks)
# Process responses
for i, response in enumerate(responses, 1):
print(f"✅ Download {i} complete")
response.close()
total_time = time.time() - start
print(f"⏱️ Total time: {total_time:.2f} seconds") # ~2 seconds!
return responses
asyncio.run(concurrent_downloads())
Key insight: Asyncio doesn't make individual operations faster - it eliminates wasted waiting time by running operations concurrently.
The Event Loop: The Heart of Asyncio¶
The event loop is asyncio's central coordinator - a single-threaded mechanism that manages and executes asynchronous tasks. Think of it as a highly efficient restaurant manager:
import asyncio
import random
import time
async def restaurant_simulation():
"""Demonstrates how the event loop coordinates multiple operations"""
async def serve_customer(customer_id, service_time):
print(f"🍽️ Customer {customer_id}: Order taken")
# This represents I/O waiting (kitchen preparing food)
# The event loop can serve other customers during this time
await asyncio.sleep(service_time)
print(f"✅ Customer {customer_id}: Order ready! ({service_time}s)")
return f"Meal for customer {customer_id}"
print("🏪 Restaurant opens - Multiple customers arrive")
# Customers with different service times
customers = [
serve_customer(1, 3.0), # Slow order
serve_customer(2, 1.0), # Fast order
serve_customer(3, 2.0), # Medium order
serve_customer(4, 1.5), # Medium-fast order
]
# Event loop serves all customers concurrently
start_time = time.time()
meals = await asyncio.gather(*customers)
total_time = time.time() - start_time
print(f"🏪 All customers served in {total_time:.1f}s")
print("Notice: Faster orders completed first, regardless of arrival order!")
return meals
asyncio.run(restaurant_simulation())
Event Loop Mechanics:
The event loop operates on a simple but powerful principle:
- Run ready tasks until they hit an
await
(suspension point) - Track waiting tasks and when they'll be ready to resume
- Resume tasks as their wait conditions are met
- Repeat until all tasks complete
Here's a simplified view of what happens internally:
# Conceptual view - don't run this code
class ConceptualEventLoop:
def __init__(self):
self.ready_tasks = [] # Tasks ready to run now
self.waiting_tasks = [] # Tasks waiting for I/O or timers
self.time_callbacks = [] # Scheduled callbacks
def run_until_complete(self, main_task):
"""Main event loop execution"""
self.ready_tasks.append(main_task)
while self.ready_tasks or self.waiting_tasks:
# Execute all ready tasks
while self.ready_tasks:
task = self.ready_tasks.pop(0)
try:
# Run task until it hits 'await'
task.step()
except TaskComplete:
continue # Task finished
except TaskSuspended as suspend_info:
# Task is waiting for something
self.waiting_tasks.append((task, suspend_info))
# Check which waiting tasks are now ready
self.check_waiting_tasks()
# Handle timer callbacks
self.handle_timer_callbacks()
def check_waiting_tasks(self):
"""Move completed waits back to ready queue"""
still_waiting = []
for task, wait_info in self.waiting_tasks:
if wait_info.is_ready():
self.ready_tasks.append(task)
else:
still_waiting.append((task, wait_info))
self.waiting_tasks = still_waiting
Coroutines: The Building Blocks¶
Coroutines are special functions that can be suspended and resumed. They're the fundamental units of work in asyncio.
Creating and Understanding Coroutines¶
import asyncio
# Regular function - runs to completion immediately
def regular_function(name):
print(f"Regular function {name} starting")
return f"Result from {name}"
# Coroutine function - can be suspended and resumed
async def coroutine_function(name):
print(f"Coroutine {name} starting")
# This is a suspension point - execution can pause here
await asyncio.sleep(1)
print(f"Coroutine {name} resuming after 1 second")
return f"Async result from {name}"
async def demonstrate_coroutine_lifecycle():
print("=== Regular Function ===")
result = regular_function("RegularFunc")
print(f"Result: {result}")
print("\n=== Coroutine Function ===")
# Calling a coroutine function doesn't run it!
coro = coroutine_function("CoroFunc")
print(f"Coroutine object created: {type(coro)}")
# You must await it to actually run it
result = await coro
print(f"Result: {result}")
print("\n=== Multiple Coroutines Concurrently ===")
# Multiple coroutines can run together
results = await asyncio.gather(
coroutine_function("Coro1"),
coroutine_function("Coro2"),
coroutine_function("Coro3")
)
print(f"All results: {results}")
asyncio.run(demonstrate_coroutine_lifecycle())
Coroutine States and Execution Flow¶
import asyncio
import inspect
async def trace_coroutine_execution():
"""Demonstrates the different states a coroutine goes through"""
async def traced_operation(operation_id):
print(f"🚀 Operation {operation_id}: Starting")
print(f"⏸️ Operation {operation_id}: About to suspend (await)")
await asyncio.sleep(1) # Suspension point
print(f"▶️ Operation {operation_id}: Resumed after await")
print(f"⏸️ Operation {operation_id}: Another suspension point")
await asyncio.sleep(0.5)
print(f"✅ Operation {operation_id}: Completed")
return f"Result-{operation_id}"
print("Creating coroutines (not running yet)...")
coro1 = traced_operation("A")
coro2 = traced_operation("B")
print(f"Coroutine states: {inspect.getcoroutinestate(coro1)}")
print("\nStarting concurrent execution...")
results = await asyncio.gather(coro1, coro2)
print(f"\nFinal results: {results}")
asyncio.run(trace_coroutine_execution())
Understanding await: The Suspension Mechanism¶
The await
keyword is where the magic happens. It's not just syntax - it's the mechanism that allows cooperative multitasking.
What await Actually Does¶
import asyncio
import time
async def demonstrate_await_behavior():
"""Shows exactly what happens at await points"""
print("🎬 Starting demonstration")
print("📍 Point 1: Before first await")
start_time = time.time()
# This suspends the coroutine and yields control to the event loop
await asyncio.sleep(1)
print(f"📍 Point 2: After first await ({time.time() - start_time:.1f}s elapsed)")
# Another suspension point
await asyncio.sleep(0.5)
print(f"📍 Point 3: After second await ({time.time() - start_time:.1f}s elapsed)")
return "Demo complete"
async def demonstrate_concurrent_awaits():
"""Shows how multiple awaits can run concurrently"""
async def worker(worker_id, work_duration):
print(f"👷 Worker {worker_id}: Starting work ({work_duration}s)")
await asyncio.sleep(work_duration)
print(f"✅ Worker {worker_id}: Work complete")
return f"Work result from {worker_id}"
print("🏭 Starting workers concurrently...")
start_time = time.time()
# All workers start at the same time
results = await asyncio.gather(
worker("Alpha", 2.0),
worker("Beta", 1.0),
worker("Gamma", 1.5)
)
total_time = time.time() - start_time
print(f"🏁 All work completed in {total_time:.1f}s")
print(f"📊 Results: {results}")
async def main():
await demonstrate_await_behavior()
print("\n" + "="*50 + "\n")
await demonstrate_concurrent_awaits()
asyncio.run(main())
await Rules and Awaitables¶
Not everything can be awaited. Only awaitable objects work with await
:
import asyncio
class CustomAwaitable:
"""Example of creating a custom awaitable object"""
def __init__(self, value, delay):
self.value = value
self.delay = delay
def __await__(self):
"""This makes the object awaitable"""
# Yield control to event loop for the specified delay
yield from asyncio.sleep(self.delay).__await__()
# Return the final value
return f"Custom result: {self.value}"
async def demonstrate_awaitables():
print("=== Built-in Awaitables ===")
# Coroutines are awaitable
async def simple_coro():
await asyncio.sleep(0.1)
return "Coroutine result"
result1 = await simple_coro()
print(f"Coroutine result: {result1}")
# Tasks are awaitable
task = asyncio.create_task(simple_coro())
result2 = await task
print(f"Task result: {result2}")
# Futures are awaitable
future = asyncio.Future()
future.set_result("Future result")
result3 = await future
print(f"Future result: {result3}")
print("\n=== Custom Awaitable ===")
custom = CustomAwaitable("Hello World", 0.5)
result4 = await custom
print(f"Custom awaitable result: {result4}")
print("\n=== What's NOT Awaitable ===")
try:
# This will fail - regular functions aren't awaitable
# await time.sleep(1) # ❌ TypeError
pass
except TypeError as e:
print(f"Error: {e}")
asyncio.run(demonstrate_awaitables())
Tasks: Concurrent Execution Units¶
Tasks are the primary mechanism for running coroutines concurrently. They wrap coroutines and manage their execution within the event loop.
Understanding Tasks vs Coroutines¶
import asyncio
import time
async def demonstrate_tasks_vs_coroutines():
"""Shows the critical difference between creating coroutines and tasks"""
async def background_work(work_id, duration):
print(f"🔧 Work {work_id}: Starting ({duration}s)")
for i in range(int(duration)):
await asyncio.sleep(1)
print(f"⚙️ Work {work_id}: Progress {i+1}/{int(duration)}")
print(f"✅ Work {work_id}: Complete")
return f"Result-{work_id}"
print("=== Coroutines: Don't Start Until Awaited ===")
start_time = time.time()
# Creating coroutines doesn't start them
coro1 = background_work("Coro-A", 2)
coro2 = background_work("Coro-B", 2)
print(f"⏱️ Coroutines created at {time.time() - start_time:.1f}s")
await asyncio.sleep(1) # Wait 1 second
print(f"⏱️ 1 second later - no work has started yet")
# Now run them sequentially
result1 = await coro1 # Runs for 2 seconds
result2 = await coro2 # Runs for another 2 seconds
coro_time = time.time() - start_time
print(f"⏱️ Coroutines completed in {coro_time:.1f}s (sequential)")
print("\n=== Tasks: Start Immediately ===")
start_time = time.time()
# Creating tasks starts them immediately!
task1 = asyncio.create_task(background_work("Task-A", 2))
task2 = asyncio.create_task(background_work("Task-B", 2))
print(f"⏱️ Tasks created at {time.time() - start_time:.1f}s")
await asyncio.sleep(1) # Wait 1 second
print(f"⏱️ 1 second later - work is already running!")
# Wait for both to complete (they run concurrently)
results = await asyncio.gather(task1, task2)
task_time = time.time() - start_time
print(f"⏱️ Tasks completed in {task_time:.1f}s (concurrent)")
print(f"\n📊 Time difference: {coro_time - task_time:.1f}s saved with tasks!")
asyncio.run(demonstrate_tasks_vs_coroutines())
Task Management and Control¶
import asyncio
import random
async def demonstrate_task_management():
"""Shows how to manage, monitor, and control tasks"""
async def worker_task(worker_id):
"""A worker that does some work and might fail"""
try:
work_time = random.uniform(1, 4)
print(f"👷 Worker {worker_id}: Starting {work_time:.1f}s of work")
# Simulate work with progress updates
for i in range(int(work_time)):
await asyncio.sleep(1)
print(f"⚙️ Worker {worker_id}: {i+1}/{int(work_time)} seconds complete")
# Simulate random failures
if random.random() < 0.3: # 30% chance of failure
raise Exception(f"Worker {worker_id} encountered an error!")
print(f"✅ Worker {worker_id}: Work completed successfully")
return f"Success-{worker_id}"
except asyncio.CancelledError:
print(f"🛑 Worker {worker_id}: Cancelled during work")
raise # Re-raise to complete cancellation
print("🏭 Starting worker management demo")
# Create multiple tasks
tasks = [
asyncio.create_task(worker_task(f"W{i}"))
for i in range(1, 6)
]
# Monitor task progress
async def monitor_tasks():
while True:
await asyncio.sleep(0.5)
pending = [t for t in tasks if not t.done()]
completed = [t for t in tasks if t.done() and not t.cancelled()]
cancelled = [t for t in tasks if t.cancelled()]
failed = [t for t in tasks if t.done() and t.exception()]
print(f"📊 Status: {len(pending)} pending, {len(completed)} completed, "
f"{len(cancelled)} cancelled, {len(failed)} failed")
if not pending:
break
monitor_task = asyncio.create_task(monitor_tasks())
try:
# Let tasks run for a bit
await asyncio.sleep(2)
# Cancel any remaining tasks that are taking too long
for task in tasks:
if not task.done():
print(f"⏰ Cancelling slow task...")
task.cancel()
# Wait for all tasks to finish (including cancellation)
results = await asyncio.gather(*tasks, return_exceptions=True)
print("\n📋 Final Results:")
for i, result in enumerate(results):
if isinstance(result, asyncio.CancelledError):
print(f" Task {i+1}: Cancelled")
elif isinstance(result, Exception):
print(f" Task {i+1}: Failed - {result}")
else:
print(f" Task {i+1}: {result}")
finally:
monitor_task.cancel()
try:
await monitor_task
except asyncio.CancelledError:
pass
asyncio.run(demonstrate_task_management())
Concurrent Execution Patterns¶
Asyncio provides several patterns for running multiple operations. Each has different use cases and behaviors.
asyncio.gather(): All or Nothing Coordination¶
import asyncio
import aiohttp
import time
async def fetch_url(session, url, name):
"""Fetch a URL and return info about the request"""
start_time = time.time()
print(f"🌐 {name}: Starting request to {url}")
try:
async with session.get(url) as response:
content = await response.text()
duration = time.time() - start_time
print(f"✅ {name}: Completed in {duration:.2f}s ({len(content)} chars)")
return {
"name": name,
"url": url,
"duration": duration,
"size": len(content),
"status": response.status
}
except Exception as e:
duration = time.time() - start_time
print(f"❌ {name}: Failed after {duration:.2f}s - {e}")
raise
async def demonstrate_gather():
"""Shows gather() behavior with success and failure scenarios"""
print("=== gather() with all successful requests ===")
async with aiohttp.ClientSession() as session:
try:
results = await asyncio.gather(
fetch_url(session, "https://httpbin.org/delay/1", "Fast"),
fetch_url(session, "https://httpbin.org/delay/2", "Medium"),
fetch_url(session, "https://httpbin.org/delay/1.5", "Quick")
)
print("📊 All requests successful:")
for result in results:
print(f" {result['name']}: {result['duration']:.2f}s")
except Exception as e:
print(f"❌ gather() failed: {e}")
print("\n=== gather() with one failure (default behavior) ===")
async with aiohttp.ClientSession() as session:
try:
results = await asyncio.gather(
fetch_url(session, "https://httpbin.org/delay/1", "Success1"),
fetch_url(session, "https://httpbin.org/status/500", "Failure"), # Will fail
fetch_url(session, "https://httpbin.org/delay/1", "Success2")
)
except Exception as e:
print(f"❌ gather() failed fast due to: {e}")
print("\n=== gather() with return_exceptions=True ===")
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(
fetch_url(session, "https://httpbin.org/delay/1", "Success1"),
fetch_url(session, "https://httpbin.org/status/500", "Failure"),
fetch_url(session, "https://httpbin.org/delay/1", "Success2"),
return_exceptions=True # Don't fail, return exceptions
)
print("📊 Mixed results (exceptions returned):")
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f" Request {i+1}: Failed - {result}")
else:
print(f" Request {i+1}: Success - {result['name']}")
asyncio.run(demonstrate_gather())
asyncio.as_completed(): Process Results as They Arrive¶
import asyncio
import random
import time
async def variable_duration_task(task_id):
"""Task with random duration to demonstrate as_completed behavior"""
duration = random.uniform(0.5, 3.0)
print(f"🚀 Task {task_id}: Starting ({duration:.1f}s estimated)")
await asyncio.sleep(duration)
print(f"✅ Task {task_id}: Completed after {duration:.1f}s")
return {
"task_id": task_id,
"duration": duration,
"completed_at": time.time()
}
async def demonstrate_as_completed():
"""Shows how as_completed processes results in completion order"""
print("🎯 Creating tasks with random durations")
start_time = time.time()
# Create tasks with different expected completion times
tasks = [
variable_duration_task(f"T{i}")
for i in range(1, 6)
]
print("⏳ Processing results as they complete (not in creation order):")
completion_order = []
# Process results as they become available
for completed_task in asyncio.as_completed(tasks):
result = await completed_task
elapsed = time.time() - start_time
completion_order.append(result['task_id'])
print(f"📥 Received result from {result['task_id']} "
f"after {elapsed:.1f}s total elapsed")
total_time = time.time() - start_time
print(f"\n📊 All tasks completed in {total_time:.1f}s")
print(f"🔄 Completion order: {' → '.join(completion_order)}")
print("💡 Notice: Results arrived in completion order, not creation order!")
async def demonstrate_as_completed_with_timeout():
"""Shows as_completed with timeout handling"""
async def slow_task(task_id, duration):
await asyncio.sleep(duration)
return f"Task {task_id} result"
print("\n" + "="*50)
print("⏰ as_completed with timeout demonstration")
tasks = [
slow_task("A", 1),
slow_task("B", 3), # Will timeout
slow_task("C", 2),
slow_task("D", 4), # Will timeout
]
completed_count = 0
timeout_seconds = 2.5
try:
for completed_task in asyncio.as_completed(tasks, timeout=timeout_seconds):
try:
result = await completed_task
completed_count += 1
print(f"✅ Completed {completed_count}: {result}")
except asyncio.TimeoutError:
print(f"⏰ Task timed out after {timeout_seconds}s")
break
except asyncio.TimeoutError:
print(f"⏰ Overall timeout reached after {timeout_seconds}s")
print(f"📊 Completed {completed_count} tasks before timeout")
async def main():
await demonstrate_as_completed()
await demonstrate_as_completed_with_timeout()
asyncio.run(main())
asyncio.wait(): Fine-Grained Control¶
import asyncio
import time
async def controlled_task(task_id, duration, should_fail=False):
"""Task for demonstrating wait() control features"""
print(f"🚀 Task {task_id}: Starting")
try:
await asyncio.sleep(duration)
if should_fail:
raise ValueError(f"Task {task_id} intentionally failed")
print(f"✅ Task {task_id}: Completed successfully")
return f"Result from task {task_id}"
except asyncio.CancelledError:
print(f"🛑 Task {task_id}: Cancelled")
raise
async def demonstrate_wait_conditions():
"""Shows different wait conditions and their behaviors"""
print("=== FIRST_COMPLETED: Return when any task finishes ===")
tasks = [
asyncio.create_task(controlled_task("A", 1)),
asyncio.create_task(controlled_task("B", 2)),
asyncio.create_task(controlled_task("C", 3)),
]
start_time = time.time()
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
elapsed = time.time() - start_time
print(f"⏱️ First completion after {elapsed:.1f}s")
print(f"📊 {len(done)} completed, {len(pending)} still running")
# Get result from completed task
for task in done:
result = await task
print(f"🎯 First result: {result}")
# Cancel remaining tasks
for task in pending:
task.cancel()
# Wait for cancellation to complete
await asyncio.wait(pending)
print("\n=== FIRST_EXCEPTION: Return when any task fails ===")
tasks = [
asyncio.create_task(controlled_task("X", 2, should_fail=False)),
asyncio.create_task(controlled_task("Y", 1, should_fail=True)), # Will fail
asyncio.create_task(controlled_task("Z", 3, should_fail=False)),
]
start_time = time.time()
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_EXCEPTION
)
elapsed = time.time() - start_time
print(f"⏱️ First exception after {elapsed:.1f}s")
# Check for exceptions
for task in done:
try:
result = await task
print(f"✅ Task completed: {result}")
except Exception as e:
print(f"❌ Task failed: {e}")
# Cancel and wait for remaining tasks
for task in pending:
task.cancel()
await asyncio.wait(pending)
async def demonstrate_wait_with_timeout():
"""Shows wait() with timeout functionality"""
print("\n=== wait() with timeout ===")
tasks = [
asyncio.create_task(controlled_task("Fast", 1)),
asyncio.create_task(controlled_task("Medium", 3)),
asyncio.create_task(controlled_task("Slow", 5)),
]
start_time = time.time()
# Wait up to 2 seconds
done, pending = await asyncio.wait(
tasks,
timeout=2.0,
return_when=asyncio.ALL_COMPLETED
)
elapsed = time.time() - start_time
print(f"⏱️ Timeout reached after {elapsed:.1f}s")
print(f"📊 {len(done)} completed, {len(pending)} timed out")
# Process completed tasks
for task in done:
result = await task
print(f"✅ Completed before timeout: {result}")
# Handle timed-out tasks
for task in pending:
print(f"⏰ Task timed out, cancelling...")
task.cancel()
# Wait for cancellation
await asyncio.wait(pending)
async def main():
await demonstrate_wait_conditions()
await demonstrate_wait_with_timeout()
asyncio.run(main())
Synchronization: Coordinating Concurrent Operations¶
When multiple coroutines work with shared resources, you need synchronization primitives to prevent race conditions and coordinate access.
asyncio.Lock: Mutual Exclusion¶
import asyncio
import random
class SharedCounter:
"""Demonstrates the need for locks in concurrent operations"""
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
self.operation_log = []
async def unsafe_increment(self, worker_id):
"""Unsafe increment that can cause race conditions"""
# Read current value
old_value = self.value
# Simulate some processing time (this is where race conditions occur)
await asyncio.sleep(0.01)
# Write back the incremented value
self.value = old_value + 1
self.operation_log.append(f"Worker {worker_id}: {old_value} → {self.value}")
async def safe_increment(self, worker_id):
"""Safe increment using a lock"""
async with self.lock: # Only one worker can execute this block at a time
old_value = self.value
# Even with delay, no race condition occurs
await asyncio.sleep(0.01)
self.value = old_value + 1
self.operation_log.append(f"Worker {worker_id}: {old_value} → {self.value}")
async def demonstrate_race_conditions():
"""Shows the difference between safe and unsafe concurrent operations"""
print("=== Race Condition Demonstration ===")
# Test unsafe operations
unsafe_counter = SharedCounter()
print("🚨 Running UNSAFE concurrent increments...")
tasks = [
unsafe_counter.unsafe_increment(f"U{i}")
for i in range(10)
]
await asyncio.gather(*tasks)
print(f"❌ Expected final value: 10, Actual: {unsafe_counter.value}")
print("📋 Last few operations:")
for log_entry in unsafe_counter.operation_log[-5:]:
print(f" {log_entry}")
# Test safe operations
safe_counter = SharedCounter()
print("\n✅ Running SAFE concurrent increments...")
tasks = [
safe_counter.safe_increment(f"S{i}")
for i in range(10)
]
await asyncio.gather(*tasks)
print(f"✅ Expected final value: 10, Actual: {safe_counter.value}")
print("📋 Last few operations:")
for log_entry in safe_counter.operation_log[-5:]:
print(f" {log_entry}")
async def demonstrate_lock_usage_patterns():
"""Shows different patterns for using locks effectively"""
lock = asyncio.Lock()
shared_resource = {"data": [], "processing_count": 0}
async def producer(producer_id):
"""Produces data items"""
for i in range(3):
item = f"P{producer_id}-Item{i}"
async with lock:
shared_resource["data"].append(item)
print(f"📦 Producer {producer_id} added: {item}")
await asyncio.sleep(0.1) # Simulate work between productions
async def processor():
"""Processes data items"""
processed_items = []
while len(processed_items) < 9: # Expect 3 producers × 3 items each
async with lock:
if shared_resource["data"]:
item = shared_resource["data"].pop(0)
shared_resource["processing_count"] += 1
current_count = shared_resource["processing_count"]
else:
item = None
if item:
# Process outside the lock to allow other operations
await asyncio.sleep(0.05) # Simulate processing time
processed_items.append(item)
print(f"🔧 Processed: {item} (#{current_count})")
else:
# No items available, wait a bit
await asyncio.sleep(0.01)
return processed_items
print("\n=== Producer-Consumer with Lock ===")
# Start producers and processor concurrently
producer_tasks = [producer(i) for i in range(1, 4)]
processor_task = processor()
results = await asyncio.gather(*producer_tasks, processor_task)
processed_items = results[-1] # Last result is from processor
print(f"📊 Processing complete: {len(processed_items)} items processed")
async def main():
await demonstrate_race_conditions()
await demonstrate_lock_usage_patterns()
asyncio.run(main())
asyncio.Semaphore: Limiting Concurrent Access¶
import asyncio
import aiohttp
import time
class RateLimitedDownloader:
"""Demonstrates semaphore usage for controlling concurrent operations"""
def __init__(self, max_concurrent_downloads=3):
self.semaphore = asyncio.Semaphore(max_concurrent_downloads)
self.download_stats = {
"started": 0,
"completed": 0,
"failed": 0,
"concurrent_peak": 0,
"currently_running": 0
}
self.stats_lock = asyncio.Lock()
async def download_file(self, session, url, file_id):
"""Download a file with concurrency limiting"""
# Wait for permission to start download
async with self.semaphore:
# Update stats
async with self.stats_lock:
self.download_stats["started"] += 1
self.download_stats["currently_running"] += 1
if self.download_stats["currently_running"] > self.download_stats["concurrent_peak"]:
self.download_stats["concurrent_peak"] = self.download_stats["currently_running"]
print(f"🌐 Download {file_id}: Starting (concurrent: {self.download_stats['currently_running']})")
try:
# Simulate download
start_time = time.time()
async with session.get(url) as response:
data = await response.read()
download_time = time.time() - start_time
# Simulate file processing
await asyncio.sleep(0.1)
async with self.stats_lock:
self.download_stats["completed"] += 1
self.download_stats["currently_running"] -= 1
print(f"✅ Download {file_id}: Complete ({len(data)} bytes in {download_time:.2f}s)")
return {"file_id": file_id, "size": len(data), "duration": download_time}
except Exception as e:
async with self.stats_lock:
self.download_stats["failed"] += 1
self.download_stats["currently_running"] -= 1
print(f"❌ Download {file_id}: Failed - {e}")
raise
async def demonstrate_semaphore_benefits():
"""Shows how semaphores prevent overwhelming services"""
print("=== Semaphore: Controlling Concurrent Downloads ===")
# Create downloader with limit of 3 concurrent downloads
downloader = RateLimitedDownloader(max_concurrent_downloads=3)
# Prepare many download URLs
urls = [f"https://httpbin.org/bytes/1024" for _ in range(10)]
async with aiohttp.ClientSession() as session:
# Start all downloads (but semaphore limits concurrency)
download_tasks = [
downloader.download_file(session, url, f"File-{i+1}")
for i, url in enumerate(urls)
]
start_time = time.time()
results = await asyncio.gather(*download_tasks, return_exceptions=True)
total_time = time.time() - start_time
# Analyze results
successful = [r for r in results if isinstance(r, dict)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"\n📊 Download Statistics:")
print(f" Total downloads: {len(urls)}")
print(f" Successful: {len(successful)}")
print(f" Failed: {len(failed)}")
print(f" Total time: {total_time:.2f}s")
print(f" Peak concurrent: {downloader.download_stats['concurrent_peak']}")
print(f" Average time per download: {total_time/len(successful):.2f}s")
async def demonstrate_semaphore_vs_unlimited():
"""Compares semaphore-limited vs unlimited concurrent access"""
async def unlimited_downloads():
"""Downloads without any concurrency limiting"""
async def simple_download(session, url, file_id):
start_time = time.time()
async with session.get(url) as response:
data = await response.read()
duration = time.time() - start_time
print(f"🚀 Unlimited {file_id}: {duration:.2f}s")
return duration
async with aiohttp.ClientSession() as session:
tasks = [
simple_download(session, f"https://httpbin.org/delay/1", f"U{i}")
for i in range(8)
]
return await asyncio.gather(*tasks)
async def limited_downloads():
"""Downloads with semaphore limiting to 3 concurrent"""
semaphore = asyncio.Semaphore(3)
async def limited_download(session, url, file_id):
async with semaphore:
start_time = time.time()
async with session.get(url) as response:
data = await response.read()
duration = time.time() - start_time
print(f"🎛️ Limited {file_id}: {duration:.2f}s")
return duration
async with aiohttp.ClientSession() as session:
tasks = [
limited_download(session, f"https://httpbin.org/delay/1", f"L{i}")
for i in range(8)
]
return await asyncio.gather(*tasks)
print("\n" + "="*50)
print("⚡ Unlimited Concurrent Downloads (may overwhelm server):")
start_time = time.time()
unlimited_results = await unlimited_downloads()
unlimited_time = time.time() - start_time
print(f"🎛️ Limited Concurrent Downloads (3 at a time):")
start_time = time.time()
limited_results = await limited_downloads()
limited_time = time.time() - start_time
print(f"\n📈 Comparison:")
print(f" Unlimited: {unlimited_time:.2f}s total, avg {sum(unlimited_results)/len(unlimited_results):.2f}s per download")
print(f" Limited: {limited_time:.2f}s total, avg {sum(limited_results)/len(limited_results):.2f}s per download")
async def main():
await demonstrate_semaphore_benefits()
await demonstrate_semaphore_vs_unlimited()
asyncio.run(main())
asyncio.Event: Signaling and Coordination¶
import asyncio
import random
import time
async def demonstrate_event_coordination():
"""Shows how events coordinate multiple coroutines"""
# Events for coordination
start_event = asyncio.Event()
data_ready_event = asyncio.Event()
processing_complete_event = asyncio.Event()
# Shared data
shared_data = {"items": [], "processed_items": []}
async def data_producer():
"""Produces data and signals when ready"""
print("📦 Producer: Waiting for start signal...")
await start_event.wait()
print("📦 Producer: Starting data generation...")
for i in range(5):
item = f"Data-{i}"
shared_data["items"].append(item)
print(f"📦 Producer: Generated {item}")
await asyncio.sleep(0.2) # Simulate generation time
print("📦 Producer: All data generated, signaling data_ready")
data_ready_event.set()
async def data_processor(processor_id):
"""Processes data after it's ready"""
print(f"🔧 Processor {processor_id}: Waiting for data...")
await data_ready_event.wait()
print(f"🔧 Processor {processor_id}: Data is ready, starting processing...")
while shared_data["items"]:
if shared_data["items"]: # Double-check in case another processor took it
item = shared_data["items"].pop(0)
# Simulate processing
processing_time = random.uniform(0.1, 0.3)
await asyncio.sleep(processing_time)
processed_item = f"Processed-{item}-by-P{processor_id}"
shared_data["processed_items"].append(processed_item)
print(f"🔧 Processor {processor_id}: Completed {processed_item}")
print(f"🔧 Processor {processor_id}: No more items to process")
# Check if all processing is done
if not shared_data["items"] and len(shared_data["processed_items"]) == 5:
print(f"🔧 Processor {processor_id}: All processing complete, signaling completion")
processing_complete_event.set()
async def coordinator():
"""Coordinates the entire process"""
print("👨💼 Coordinator: Setting up workers...")
# Start producer and processors
producer_task = asyncio.create_task(data_producer())
processor_tasks = [
asyncio.create_task(data_processor(i))
for i in range(1, 4) # 3 processors
]
await asyncio.sleep(0.5) # Let everyone get ready
print("👨💼 Coordinator: Sending start signal!")
start_event.set()
# Wait for processing to complete
await processing_complete_event.wait()
print("👨💼 Coordinator: All work completed!")
# Wait for all tasks to finish
await producer_task
await asyncio.gather(*processor_tasks)
return shared_data["processed_items"]
print("=== Event-Coordinated Processing Pipeline ===")
results = await coordinator()
print(f"📊 Final results: {len(results)} items processed")
for item in results:
print(f" ✅ {item}")
async def demonstrate_event_patterns():
"""Shows common event usage patterns"""
async def waiter(waiter_id, event):
"""Waits for an event with timeout"""
print(f"⏳ Waiter {waiter_id}: Waiting for event...")
try:
await asyncio.wait_for(event.wait(), timeout=2.0)
print(f"🎉 Waiter {waiter_id}: Event received!")
except asyncio.TimeoutError:
print(f"⏰ Waiter {waiter_id}: Timeout waiting for event")
async def delayed_setter(event, delay, event_name):
"""Sets an event after a delay"""
print(f"⏰ Setter: Will signal '{event_name}' in {delay}s")
await asyncio.sleep(delay)
print(f"📢 Setter: Signaling '{event_name}'!")
event.set()
print("\n=== Event Timeout Patterns ===")
# Test 1: Event set before timeout
quick_event = asyncio.Event()
await asyncio.gather(
waiter("A", quick_event),
delayed_setter(quick_event, 1.0, "quick_event")
)
# Test 2: Event not set before timeout
slow_event = asyncio.Event()
await asyncio.gather(
waiter("B", slow_event),
delayed_setter(slow_event, 3.0, "slow_event") # Will timeout
)
async def main():
await demonstrate_event_coordination()
await demonstrate_event_patterns()
asyncio.run(main())
Error Handling and Resilience¶
Proper error handling in asyncio requires understanding how exceptions propagate through concurrent operations.
Exception Propagation in Concurrent Code¶
import asyncio
import random
async def demonstrate_exception_propagation():
"""Shows how exceptions behave in different asyncio patterns"""
async def reliable_task(task_id):
await asyncio.sleep(0.5)
return f"Success from {task_id}"
async def unreliable_task(task_id, failure_rate=0.5):
await asyncio.sleep(0.5)
if random.random() < failure_rate:
raise ValueError(f"Task {task_id} failed randomly!")
return f"Success from {task_id}"
print("=== gather() Exception Behavior ===")
print("🧪 Test 1: gather() with default behavior (fail fast)")
try:
results = await asyncio.gather(
reliable_task("R1"),
unreliable_task("U1", failure_rate=1.0), # Will definitely fail
reliable_task("R2")
)
print(f"✅ All completed: {results}")
except Exception as e:
print(f"❌ gather() failed: {e}")
print("💡 Note: When one task fails, gather() raises immediately")
print("\n🧪 Test 2: gather() with return_exceptions=True")
results = await asyncio.gather(
reliable_task("R3"),
unreliable_task("U2", failure_rate=1.0), # Will definitely fail
reliable_task("R4"),
return_exceptions=True # Return exceptions instead of raising
)
print("📊 Mixed results:")
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f" Task {i+1}: ❌ {result}")
else:
print(f" Task {i+1}: ✅ {result}")
async def demonstrate_task_exception_handling():
"""Shows proper exception handling with individual tasks"""
async def monitored_task(task_id, should_fail=False):
try:
await asyncio.sleep(0.3)
if should_fail:
raise RuntimeError(f"Task {task_id} encountered an error")
return f"Result from {task_id}"
except asyncio.CancelledError:
print(f"🛑 Task {task_id}: Cancelled")
raise # Always re-raise CancelledError
print("\n=== Individual Task Exception Handling ===")
# Create tasks
tasks = [
asyncio.create_task(monitored_task("T1", should_fail=False)),
asyncio.create_task(monitored_task("T2", should_fail=True)),
asyncio.create_task(monitored_task("T3", should_fail=False)),
asyncio.create_task(monitored_task("T4", should_fail=True)),
]
# Handle each task individually
results = []
for i, task in enumerate(tasks):
try:
result = await task
results.append(result)
print(f"✅ Task {i+1}: {result}")
except Exception as e:
results.append(f"Error: {e}")
print(f"❌ Task {i+1}: {e}")
print(f"📊 Final results: {len([r for r in results if not r.startswith('Error')])} successful")
async def demonstrate_timeout_handling():
"""Shows comprehensive timeout handling strategies"""
async def slow_operation(operation_id, duration):
print(f"🐌 Operation {operation_id}: Starting ({duration}s)")
await asyncio.sleep(duration)
print(f"✅ Operation {operation_id}: Completed")
return f"Result from {operation_id}"
print("\n=== Timeout Handling Strategies ===")
# Strategy 1: Individual operation timeout
print("🧪 Individual operation timeout:")
try:
result = await asyncio.wait_for(
slow_operation("Slow1", 3),
timeout=2.0
)
print(f"✅ Result: {result}")
except asyncio.TimeoutError:
print("⏰ Operation timed out")
# Strategy 2: Partial completion with timeout
print("\n🧪 Partial completion with timeout:")
operations = [
slow_operation("Op1", 1), # Will complete
slow_operation("Op2", 2), # Will complete
slow_operation("Op3", 4), # Will timeout
slow_operation("Op4", 5), # Will timeout
]
try:
completed, pending = await asyncio.wait(
operations,
timeout=2.5,
return_when=asyncio.ALL_COMPLETED
)
print(f"📊 Completed: {len(completed)}, Pending: {len(pending)}")
# Process completed tasks
for task in completed:
try:
result = await task
print(f"✅ Completed: {result}")
except Exception as e:
print(f"❌ Failed: {e}")
# Cancel pending tasks
for task in pending:
task.cancel()
# Wait for cancellation to complete
if pending:
await asyncio.wait(pending)
except Exception as e:
print(f"❌ Unexpected error: {e}")
async def demonstrate_resilient_patterns():
"""Shows patterns for building resilient async applications"""
async def unreliable_service_call(call_id):
"""Simulates an unreliable external service"""
await asyncio.sleep(0.1)
if random.random() < 0.7: # 70% failure rate
raise ConnectionError(f"Service call {call_id} failed")
return f"Service result {call_id}"
async def retry_with_backoff(operation, max_retries=3, base_delay=0.1):
"""Retry an operation with exponential backoff"""
for attempt in range(max_retries + 1):
try:
return await operation()
except Exception as e:
if attempt == max_retries:
raise # Final attempt failed
delay = base_delay * (2 ** attempt) # Exponential backoff
print(f"⚠️ Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
async def circuit_breaker_call(service_func, failure_threshold=3):
"""Simple circuit breaker pattern"""
if not hasattr(circuit_breaker_call, 'failures'):
circuit_breaker_call.failures = 0
circuit_breaker_call.last_failure_time = 0
# Check if circuit is open
if circuit_breaker_call.failures >= failure_threshold:
time_since_failure = asyncio.get_event_loop().time() - circuit_breaker_call.last_failure_time
if time_since_failure < 5.0: # 5 second timeout
raise Exception("Circuit breaker is OPEN - service unavailable")
else:
# Reset circuit breaker
circuit_breaker_call.failures = 0
try:
result = await service_func()
circuit_breaker_call.failures = 0 # Reset on success
return result
except Exception as e:
circuit_breaker_call.failures += 1
circuit_breaker_call.last_failure_time = asyncio.get_event_loop().time()
raise
print("\n=== Resilient Service Calls ===")
# Test retry pattern
print("🔄 Testing retry with backoff:")
try:
result = await retry_with_backoff(
lambda: unreliable_service_call("RetryTest"),
max_retries=3
)
print(f"✅ Retry succeeded: {result}")
except Exception as e:
print(f"❌ All retries failed: {e}")
# Test circuit breaker pattern
print("\n🔌 Testing circuit breaker:")
for i in range(8):
try:
result = await circuit_breaker_call(
lambda: unreliable_service_call(f"CB-{i}")
)
print(f"✅ Call {i}: {result}")
except Exception as e:
print(f"❌ Call {i}: {e}")
async def main():
await demonstrate_exception_propagation()
await demonstrate_task_exception_handling()
await demonstrate_timeout_handling()
await demonstrate_resilient_patterns()
asyncio.run(main())
Performance Optimization and Best Practices¶
Understanding asyncio performance characteristics helps you build efficient applications.
Connection Pooling and Resource Management¶
import asyncio
import aiohttp
import time
class HighPerformanceHTTPClient:
"""Demonstrates proper resource management and connection pooling"""
def __init__(self, max_connections=100, requests_per_host=30):
# Configure connection pooling
self.connector = aiohttp.TCPConnector(
limit=max_connections, # Total connection pool size
limit_per_host=requests_per_host, # Connections per host
keepalive_timeout=30, # Keep connections alive
enable_cleanup_closed=True, # Clean up closed connections
use_dns_cache=True, # Cache DNS lookups
ttl_dns_cache=300 # DNS cache TTL
)
# Configure timeouts
self.timeout = aiohttp.ClientTimeout(
total=30, # Total request timeout
connect=5, # Connection timeout
sock_read=10 # Socket read timeout
)
self.session = None
self.request_count = 0
self.connection_stats = {"created": 0, "reused": 0}
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
await self.connector.close()
async def get(self, url, **kwargs):
"""Make GET request with connection reuse"""
self.request_count += 1
async with self.session.get(url, **kwargs) as response:
# Track connection reuse
connection_key = response.connection.key
if hasattr(self, '_seen_connections'):
if connection_key in self._seen_connections:
self.connection_stats["reused"] += 1
else:
self.connection_stats["created"] += 1
self._seen_connections.add(connection_key)
else:
self._seen_connections = {connection_key}
self.connection_stats["created"] += 1
return await response.json()
def get_stats(self):
"""Get performance statistics"""
return {
"total_requests": self.request_count,
"connections_created": self.connection_stats["created"],
"connections_reused": self.connection_stats["reused"],
"reuse_rate": self.connection_stats["reused"] / max(1, self.request_count) * 100
}
async def demonstrate_connection_pooling():
"""Shows the benefits of connection pooling"""
print("=== Connection Pooling Performance ===")
async def test_with_pooling():
"""Test with proper connection pooling"""
async with HighPerformanceHTTPClient(max_connections=10) as client:
start_time = time.time()
# Make many requests to the same host
tasks = [
client.get(f"https://httpbin.org/get?id={i}")
for i in range(20)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
duration = time.time() - start_time
stats = client.get_stats()
successful = len([r for r in results if not isinstance(r, Exception)])
return {
"duration": duration,
"successful_requests": successful,
"stats": stats
}
async def test_without_pooling():
"""Test without connection pooling (creates new session each time)"""
async def single_request(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
start_time = time.time()
tasks = [
single_request(f"https://httpbin.org/get?id={i}")
for i in range(20)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
duration = time.time() - start_time
successful = len([r for r in results if not isinstance(r, Exception)])
return {
"duration": duration,
"successful_requests": successful,
"stats": {"note": "No pooling - each request creates new connection"}
}
# Test both approaches
print("🔄 Testing WITH connection pooling:")
pooled_results = await test_with_pooling()
print("🔄 Testing WITHOUT connection pooling:")
no_pool_results = await test_without_pooling()
# Compare results
print(f"\n📊 Performance Comparison:")
print(f" With pooling: {pooled_results['duration']:.2f}s for {pooled_results['successful_requests']} requests")
print(f" Without pooling: {no_pool_results['duration']:.2f}s for {no_pool_results['successful_requests']} requests")
print(f" Speedup: {no_pool_results['duration'] / pooled_results['duration']:.1f}x")
print(f"\n🔌 Connection Statistics:")
stats = pooled_results['stats']
print(f" Connections created: {stats['connections_created']}")
print(f" Connections reused: {stats['connections_reused']}")
print(f" Reuse rate: {stats['reuse_rate']:.1f}%")
async def demonstrate_batching_optimization():
"""Shows the performance benefits of batching operations"""
async def process_item(item_id):
"""Simulate processing a single item"""
await asyncio.sleep(0.1) # Simulate processing time
return f"Processed-{item_id}"
async def process_batch(item_ids):
"""Simulate processing a batch of items (more efficient)"""
await asyncio.sleep(0.05 * len(item_ids)) # Batch processing is more efficient
return [f"BatchProcessed-{item_id}" for item_id in item_ids]
print("\n=== Batching vs Individual Processing ===")
items = list(range(1, 21)) # 20 items to process
# Individual processing
print("🔄 Individual processing:")
start_time = time.time()
individual_results = await asyncio.gather(*[
process_item(item_id) for item_id in items
])
individual_time = time.time() - start_time
# Batch processing
print("🔄 Batch processing:")
batch_size = 5
batches = [items[i:i+batch_size] for i in range(0, len(items), batch_size)]
start_time = time.time()
batch_results = await asyncio.gather(*[
process_batch(batch) for batch in batches
])
# Flatten results
batch_results_flat = [item for batch in batch_results for item in batch]
batch_time = time.time() - start_time
print(f"\n📊 Batching Performance:")
print(f" Individual: {individual_time:.2f}s for {len(individual_results)} items")
print(f" Batched: {batch_time:.2f}s for {len(batch_results_flat)} items")
print(f" Speedup: {individual_time / batch_time:.1f}x")
async def demonstrate_memory_efficient_streaming():
"""Shows memory-efficient processing of large datasets"""
async def generate_large_dataset():
"""Simulate a large data source"""
for i in range(10000): # Large dataset
yield {"id": i, "data": f"item_{i}", "value": i * 2}
# Small delay to simulate real data source
if i % 100 == 0:
await asyncio.sleep(0.001)
async def process_streaming_data():
"""Process data as it arrives (memory efficient)"""
processed_count = 0
total_value = 0
async for item in generate_large_dataset():
# Process each item as it arrives
total_value += item["value"]
processed_count += 1
if processed_count % 1000 == 0:
print(f"📊 Processed {processed_count} items (streaming)")
return {"count": processed_count, "total_value": total_value}
async def process_batch_loaded_data():
"""Load all data then process (memory intensive)"""
# Load all data into memory
all_data = []
async for item in generate_large_dataset():
all_data.append(item)
print(f"📊 Loaded {len(all_data)} items into memory")
# Process all data
total_value = sum(item["value"] for item in all_data)
return {"count": len(all_data), "total_value": total_value}
print("\n=== Memory-Efficient Streaming ===")
import psutil
import os
process = psutil.Process(os.getpid())
# Test streaming approach
print("🌊 Streaming processing:")
memory_before = process.memory_info().rss / 1024 / 1024 # MB
start_time = time.time()
streaming_result = await process_streaming_data()
streaming_time = time.time() - start_time
memory_after = process.memory_info().rss / 1024 / 1024 # MB
memory_used_streaming = memory_after - memory_before
# Test batch loading approach
print("📦 Batch loading processing:")
memory_before = process.memory_info().rss / 1024 / 1024 # MB
start_time = time.time()
batch_result = await process_batch_loaded_data()
batch_time = time.time() - start_time
memory_after = process.memory_info().rss / 1024 / 1024 # MB
memory_used_batch = memory_after - memory_before
print(f"\n📊 Memory and Performance Comparison:")
print(f" Streaming: {streaming_time:.2f}s, ~{memory_used_streaming:.1f}MB extra memory")
print(f" Batch: {batch_time:.2f}s, ~{memory_used_batch:.1f}MB extra memory")
print(f" Memory savings with streaming: {memory_used_batch - memory_used_streaming:.1f}MB")
async def main():
await demonstrate_connection_pooling()
await demonstrate_batching_optimization()
await demonstrate_memory_efficient_streaming()
asyncio.run(main())
Common Pitfalls and Solutions¶
Understanding common mistakes helps you write better asyncio code.
Blocking the Event Loop¶
import asyncio
import time
import requests
import aiohttp
import concurrent.futures
async def demonstrate_blocking_problems():
"""Shows how blocking calls break asyncio's concurrency"""
print("=== The Blocking Problem ===")
async def bad_concurrent_requests():
"""❌ DON'T DO THIS - blocking calls in async code"""
print("🐌 Making 'concurrent' requests with blocking library...")
start_time = time.time()
# These look concurrent but actually run sequentially!
tasks = []
for i in range(3):
# requests.get() is a blocking call that stops the entire event loop
task = asyncio.create_task(
asyncio.to_thread( # We'll fix this in the good example
requests.get, f"https://httpbin.org/delay/1"
)
)
tasks.append(task)
results = await asyncio.gather(*tasks)
duration = time.time() - start_time
print(f"🐌 'Concurrent' requests took: {duration:.2f}s")
return duration
async def truly_blocking_example():
"""❌ What NOT to do - direct blocking calls"""
print("🚫 Direct blocking calls (DON'T DO THIS):")
start_time = time.time()
# This completely blocks the event loop!
# We'll simulate this with a CPU-intensive task
def cpu_intensive_work():
total = 0
for i in range(1000000):
total += i ** 2
return total
# This blocks everything
result = cpu_intensive_work()
duration = time.time() - start_time
print(f"🚫 Blocking operation took: {duration:.2f}s")
return duration
async def good_concurrent_requests():
"""✅ DO THIS - proper async requests"""
print("🚀 Making truly concurrent requests...")
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [
session.get(f"https://httpbin.org/delay/1")
for _ in range(3)
]
responses = await asyncio.gather(*tasks)
# Close responses
for response in responses:
response.close()
duration = time.time() - start_time
print(f"🚀 Truly concurrent requests took: {duration:.2f}s")
return duration
async def good_cpu_intensive_work():
"""✅ DO THIS - CPU work in executor"""
print("⚡ CPU work in executor:")
start_time = time.time()
def cpu_intensive_work():
total = 0
for i in range(1000000):
total += i ** 2
return total
# Run CPU work in thread pool
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, cpu_intensive_work)
duration = time.time() - start_time
print(f"⚡ Non-blocking CPU work took: {duration:.2f}s")
return duration
# Compare approaches
good_requests_time = await good_concurrent_requests()
bad_requests_time = await bad_concurrent_requests()
print(f"\n📊 Request Performance:")
print(f" Speedup with proper async: {bad_requests_time / good_requests_time:.1f}x")
blocking_time = await truly_blocking_example()
nonblocking_time = await good_cpu_intensive_work()
print(f"\n📊 CPU Work Performance:")
print(f" Both approaches took similar time, but async version doesn't block the event loop")
async def demonstrate_resource_leaks():
"""Shows resource leak problems and solutions"""
print("\n=== Resource Management ===")
async def resource_leak_example():
"""❌ Resource leak example"""
print("🚨 Resource leak example (DON'T DO THIS):")
sessions = []
for i in range(5):
# Creating sessions but not closing them!
session = aiohttp.ClientSession()
sessions.append(session)
try:
async with session.get("https://httpbin.org/get") as response:
data = await response.json()
print(f" Request {i+1}: Status {response.status}")
except Exception as e:
print(f" Request {i+1}: Error {e}")
# Sessions are never closed - RESOURCE LEAK!
print("🚨 Sessions created but never closed - memory and connection leak!")
# Clean up for demo (you shouldn't rely on this)
for session in sessions:
await session.close()
async def proper_resource_management():
"""✅ Proper resource management"""
print("✅ Proper resource management:")
# Method 1: Context manager (preferred)
async with aiohttp.ClientSession() as session:
tasks = [
session.get("https://httpbin.org/get")
for _ in range(5)
]
responses = await asyncio.gather(*tasks)
for i, response in enumerate(responses):
print(f" Request {i+1}: Status {response.status}")
response.close()
# Session automatically closed by context manager
print("✅ Session automatically closed by context manager")
async def manual_cleanup_example():
"""✅ Manual cleanup when context manager not available"""
print("✅ Manual cleanup example:")
session = aiohttp.ClientSession()
try:
async with session.get("https://httpbin.org/get") as response:
data = await response.json()
print(f" Manual cleanup: Status {response.status}")
except Exception as e:
print(f" Error: {e}")
finally:
await session.close() # Always close in finally block
print("✅ Session manually closed in finally block")
await resource_leak_example()
await proper_resource_management()
await manual_cleanup_example()
async def demonstrate_deadlock_avoidance():
"""Shows how to avoid deadlocks in async code"""
print("\n=== Deadlock Avoidance ===")
async def deadlock_example():
"""⚠️ Potential deadlock scenario"""
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def task_a():
async with lock1:
print("🔒 Task A: Acquired lock1")
await asyncio.sleep(0.1)
# Try to acquire lock2 while holding lock1
print("🔒 Task A: Trying to acquire lock2...")
async with lock2:
print("🔒 Task A: Acquired lock2")
async def task_b():
async with lock2:
print("🔒 Task B: Acquired lock2")
await asyncio.sleep(0.1)
# Try to acquire lock1 while holding lock2
print("🔒 Task B: Trying to acquire lock1...")
async with lock1:
print("🔒 Task B: Acquired lock1")
print("⚠️ Potential deadlock scenario (tasks acquire locks in different order):")
try:
# This might deadlock!
await asyncio.wait_for(
asyncio.gather(task_a(), task_b()),
timeout=1.0
)
print("✅ No deadlock occurred")
except asyncio.TimeoutError:
print("❌ Deadlock detected! Tasks timed out")
async def deadlock_solution():
"""✅ Deadlock-free solution"""
lock1 = asyncio.Lock()
lock2 = asyncio.Lock()
async def safe_task_a():
# Always acquire locks in the same order
async with lock1:
print("✅ Safe Task A: Acquired lock1")
async with lock2:
print("✅ Safe Task A: Acquired lock2")
await asyncio.sleep(0.1)
async def safe_task_b():
# Same order as task A
async with lock1:
print("✅ Safe Task B: Acquired lock1")
async with lock2:
print("✅ Safe Task B: Acquired lock2")
await asyncio.sleep(0.1)
print("✅ Deadlock-free solution (consistent lock ordering):")
await asyncio.gather(safe_task_a(), safe_task_b())
print("✅ All tasks completed successfully")
await deadlock_example()
await deadlock_solution()
async def demonstrate_forgotten_await():
"""Shows the consequences of forgetting await"""
print("\n=== Forgotten await Problems ===")
async def important_async_function():
"""An async function that should be awaited"""
await asyncio.sleep(0.1)
return "Important result"
async def forgot_await_example():
"""❌ Forgetting to await"""
print("❌ Forgetting to await:")
# This doesn't actually run the function!
result = important_async_function() # Returns coroutine object
print(f" Result type: {type(result)}")
print(f" Result value: {result}")
print(" ⚠️ The function never actually executed!")
# Clean up to prevent warning
result.close()
async def correct_await_example():
"""✅ Proper await usage"""
print("✅ Proper await usage:")
result = await important_async_function() # Actually runs the function
print(f" Result: {result}")
print(" ✅ Function executed and returned result")
async def loop_await_mistake():
"""❌ Common mistake in loops"""
print("❌ Forgetting await in loops:")
coroutines = []
for i in range(3):
# This creates coroutine objects but doesn't run them
coro = important_async_function()
coroutines.append(coro)
print(f" Created {len(coroutines)} coroutine objects")
print(" ⚠️ None of them have executed yet!")
# Clean up
for coro in coroutines:
coro.close()
async def correct_loop_example():
"""✅ Correct loop with await"""
print("✅ Correct loop with concurrent execution:")
# Create tasks that start immediately
tasks = [
asyncio.create_task(important_async_function())
for _ in range(3)
]
results = await asyncio.gather(*tasks)
print(f" Results: {results}")
print(" ✅ All functions executed concurrently")
await forgot_await_example()
await correct_await_example()
await loop_await_mistake()
await correct_loop_example()
async def main():
await demonstrate_blocking_problems()
await demonstrate_resource_leaks()
await demonstrate_deadlock_avoidance()
await demonstrate_forgotten_await()
asyncio.run(main())
Real-World Application Patterns¶
Here are comprehensive patterns for building production asyncio applications:
Web Scraping with Rate Limiting¶
import asyncio
import aiohttp
import time
from urllib.parse import urljoin, urlparse
import random
class IntelligentWebScraper:
"""Production-ready web scraper with rate limiting and error handling"""
def __init__(self, base_delay=1.0, max_concurrent=5, max_retries=3):
self.base_delay = base_delay
self.max_concurrent = max_concurrent
self.max_retries = max_retries
self.session = None
self.semaphore = asyncio.Semaphore(max_concurrent)
# Statistics
self.stats = {
"requests_made": 0,
"successful_requests": 0,
"failed_requests": 0,
"retries": 0,
"rate_limited": 0
}
# Rate limiting per domain
self.last_request_time = {}
self.domain_delays = {}
async def __aenter__(self):
connector = aiohttp.TCPConnector(
limit=50,
limit_per_host=10,
keepalive_timeout=30
)
timeout = aiohttp.ClientTimeout(total=30)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
headers={
'User-Agent': 'Mozilla/5.0 (compatible; AsyncScraper/1.0)'
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def smart_delay(self, url):
"""Implement intelligent rate limiting per domain"""
domain = urlparse(url).netloc
# Get domain-specific delay
domain_delay = self.domain_delays.get(domain, self.base_delay)
# Check if we need to wait
if domain in self.last_request_time:
elapsed = time.time() - self.last_request_time[domain]
if elapsed < domain_delay:
wait_time = domain_delay - elapsed
print(f"⏰ Rate limiting: waiting {wait_time:.1f}s for {domain}")
await asyncio.sleep(wait_time)
self.stats["rate_limited"] += 1
self.last_request_time[domain] = time.time()
async def fetch_with_retries(self, url):
"""Fetch URL with intelligent retry logic"""
domain = urlparse(url).netloc
for attempt in range(self.max_retries + 1):
try:
async with self.semaphore: # Limit concurrent requests
await self.smart_delay(url) # Rate limiting
self.stats["requests_made"] += 1
async with self.session.get(url) as response:
if response.status == 200:
content = await response.text()
self.stats["successful_requests"] += 1
return {
"url": url,
"content": content,
"status": response.status,
"headers": dict(response.headers)
}
elif response.status == 429: # Rate limited
# Increase delay for this domain
self.domain_delays[domain] = self.domain_delays.get(domain, self.base_delay) * 2
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status,
message="Rate limited"
)
else:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status
)
except Exception as e:
self.stats["failed_requests"] += 1
if attempt == self.max_retries:
print(f"❌ Failed to fetch {url} after {self.max_retries + 1} attempts: {e}")
return None
# Exponential backoff
retry_delay = (2 ** attempt) + random.uniform(0, 1)
print(f"⚠️ Attempt {attempt + 1} failed for {url}: {e}. Retrying in {retry_delay:.1f}s...")
self.stats["retries"] += 1
await asyncio.sleep(retry_delay)
return None
async def demonstrate_intelligent_scraping():
"""Demonstrates intelligent web scraping"""
print("=== Intelligent Web Scraping ===")
# URLs to scrape
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/status/200",
"https://httpbin.org/json",
"https://httpbin.org/headers",
"https://httpbin.org/user-agent",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/429", # Rate limited response
"https://httpbin.org/html",
]
async with IntelligentWebScraper(base_delay=0.5, max_concurrent=3) as scraper:
start_time = time.time()
# Scrape all URLs concurrently with rate limiting
tasks = [scraper.fetch_with_retries(url) for url in urls]
results = await asyncio.gather(*tasks)
duration = time.time() - start_time
# Analyze results
successful = [r for r in results if r is not None]
failed = [r for r in results if r is None]
print(f"\n📊 Scraping Results:")
print(f" Duration: {duration:.2f}s")
print(f" Successful: {len(successful)}")
print(f" Failed: {len(failed)}")
print(f" Total requests made: {scraper.stats['requests_made']}")
print(f" Rate limited events: {scraper.stats['rate_limited']}")
print(f" Retries performed: {scraper.stats['retries']}")
# Show successful results
for result in successful[:3]: # Show first 3
print(f" ✅ {result['url']}: {result['status']} ({len(result['content'])} chars)")
asyncio.run(demonstrate_intelligent_scraping())
This regenerated asyncio cheatsheet provides a comprehensive, deep-dive exploration of asyncio concepts with working examples and practical patterns. It maintains the focus on explaining how things work rather than just listing tips, making it an excellent learning and reference resource.