Asynchronous programming in Python, primarily facilitated by the asyncio
module, is a concurrency model that allows your program to perform multiple tasks seemingly simultaneously without using traditional threads or multi-processing. Instead, it achieves concurrency through a single-threaded, event-driven approach. This is particularly beneficial for I/O-bound operations (like network requests, file operations, or database queries) where a program would otherwise spend a lot of time waiting for external resources. By using asyncio
, your program can initiate an operation, then "yield" control back to the event loop to perform other tasks while it waits for the first operation to complete, resuming only when the result is available. This leads to more efficient resource utilization and improved responsiveness, especially in web servers, network clients, and other applications that handle many concurrent connections.
Example 1: Beginner-Friendly asyncio
Example
import asyncio
async def greet_async(name):
"""
A simple asynchronous function that greets a person after a short delay.
This demonstrates the basic structure of an async function.
"""
print(f"Hello, {name}! (starting)")
await asyncio.sleep(1) # Simulate a non-blocking I/O operation (e.g., network call)
print(f"Hello, {name}! (finished after 1 second)")
async def main_beginner():
"""
The main asynchronous function to run our greet_async coroutines.
It shows how to run multiple async tasks concurrently.
"""
print("Starting beginner async example...")
await greet_async("Alice")
await greet_async("Bob")
print("Beginner async example finished.")
if __name__ == "__main__":
asyncio.run(main_beginner())
Explanation:
This beginner-friendly example introduces the core concepts of async and await. The greet_async function is defined with async def, marking it as a coroutine. Inside, await asyncio.sleep(1) simulates a task that takes time but doesn't block the entire program. The main_beginner function then calls greet_async twice. When main_beginner is executed with asyncio.run(), you'll notice that "Hello, Alice! (starting)" and "Hello, Bob! (starting)" print almost immediately, and then after a second, the "finished" messages appear sequentially. This shows that while greet_async is "waiting," the program isn't completely frozen.
Example 2: Intermediate asyncio
with asyncio.gather
import asyncio
import time
async def fetch_data(url):
"""
Simulates fetching data from a URL asynchronously.
This demonstrates how multiple I/O-bound tasks can run concurrently.
"""
start_time = time.time()
print(f"Fetching data from {url}...")
await asyncio.sleep(2) # Simulate network request delay
end_time = time.time()
print(f"Finished fetching data from {url} in {end_time - start_time:.2f} seconds.")
return f"Data from {url}"
async def main_intermediate():
"""
Uses asyncio.gather to run multiple coroutines concurrently,
waiting for all of them to complete.
"""
print("Starting intermediate async example...")
urls = ["https://api.example.com/data1", "https://api.example.com/data2", "https://api.example.com/data3"]
# asyncio.gather runs coroutines concurrently
results = await asyncio.gather(
*[fetch_data(url) for url in urls]
)
print("\nAll data fetched:")
for result in results:
print(f"- {result}")
print("Intermediate async example finished.")
if __name__ == "__main__":
asyncio.run(main_intermediate())
Explanation:
This intermediate example introduces asyncio.gather, a powerful tool for running multiple coroutines concurrently and collecting their results. The fetch_data function simulates network requests. If we were to await each fetch_data call sequentially, the total time would be roughly 6 seconds (3 * 2 seconds). However, by using asyncio.gather, all three fetch_data calls start almost simultaneously, and the total execution time for main_intermediate will be closer to 2 seconds (the duration of the longest await asyncio.sleep). This clearly demonstrates the efficiency gains of asynchronous programming for I/O-bound tasks.
Example 3: Advanced asyncio
with asyncio.Queue
and Producer/Consumer
import asyncio
import random
import time
async def producer(queue, num_items):
"""
Asynchronous producer function that puts random numbers into a queue.
Demonstrates adding items to a shared asynchronous queue.
"""
for i in range(num_items):
item = random.randint(1, 100)
print(f"Producer: Putting {item} into queue.")
await queue.put(item)
await asyncio.sleep(random.uniform(0.1, 0.5)) # Simulate work
await queue.put(None) # Signal to consumers that production is done
async def consumer(queue, consumer_id):
"""
Asynchronous consumer function that takes numbers from a queue and processes them.
Demonstrates taking items from a shared asynchronous queue.
"""
while True:
print(f"Consumer {consumer_id}: Waiting for item...")
item = await queue.get()
if item is None:
await queue.put(None) # Pass the signal along to other consumers
break
print(f"Consumer {consumer_id}: Processing {item}...")
await asyncio.sleep(random.uniform(0.5, 1.5)) # Simulate processing time
print(f"Consumer {consumer_id}: Finished processing {item}.")
queue.task_done() # Mark the item as processed
async def main_advanced():
"""
Sets up a producer-consumer pattern using an asyncio.Queue,
demonstrating inter-coroutine communication and task management.
"""
print("Starting advanced async example (producer/consumer)...")
data_queue = asyncio.Queue()
num_producers = 1
num_consumers = 3
num_items_to_produce = 10
# Create producer tasks
producer_tasks = [asyncio.create_task(producer(data_queue, num_items_to_produce)) for _ in range(num_producers)]
# Create consumer tasks
consumer_tasks = [asyncio.create_task(consumer(data_queue, i + 1)) for i in range(num_consumers)]
# Wait for all producer tasks to complete
await asyncio.gather(*producer_tasks)
# Wait for all items in the queue to be processed (including the None signals)
await data_queue.join()
# Cancel consumer tasks gracefully
for task in consumer_tasks:
task.cancel()
await asyncio.gather(*consumer_tasks, return_exceptions=True) # Gather to handle CancelledError
print("Advanced async example (producer/consumer) finished.")
if __name__ == "__main__":
asyncio.run(main_advanced())
Explanation:
This advanced example showcases inter-coroutine communication using asyncio.Queue, implementing a classic producer-consumer pattern. The producer coroutine adds items to the queue, and multiple consumer coroutines retrieve and process them concurrently. await queue.put(None) is used as a sentinel value to signal consumers that no more items will be produced. queue.join() is crucial here; it waits until all items put into the queue have been processed (marked by queue.task_done()). This pattern is highly valuable for building robust asynchronous systems where different parts of your application need to exchange data efficiently without blocking. The use of asyncio.create_task and asyncio.gather for managing multiple tasks highlights more complex asyncio usage.
Async and Await Keywords
The async
and await
keywords are the fundamental building blocks of asynchronous programming in Python, introduced in Python 3.5. They are used to define and control coroutines, which are special functions that can be paused and resumed.
The async
keyword is used to declare a function as a coroutine. When a function is defined with async def
, it becomes "awaitable," meaning it can be paused and its execution can be suspended until a specific operation completes. It signals to Python that this function will likely contain await
expressions and participate in the asynchronous event loop.
The await
keyword can only be used inside an async def
function. It's used to pause the execution of the current coroutine until the awaited "awaitable" (another coroutine, a Future, or a Task) completes. While the current coroutine is paused, the event loop can switch to and execute other ready coroutines, thus achieving cooperative multitasking. This non-blocking behavior is central to asynchronous programming, preventing the entire program from freezing during I/O operations.
Example 1: Basic async
and await
Usage
import asyncio
async def my_simple_coroutine():
"""
A basic coroutine demonstrating the use of async and await.
It simulates a short delay.
"""
print("Coroutine started.")
await asyncio.sleep(0.5) # Pause execution for 0.5 seconds, allowing others to run
print("Coroutine finished.")
async def main_basic_async_await():
"""
The main function to run our simple coroutine.
"""
print("Running basic async/await example.")
await my_simple_coroutine()
print("Basic async/await example completed.")
if __name__ == "__main__":
asyncio.run(main_basic_async_await())
Explanation:
This example demonstrates the most fundamental use of async and await. my_simple_coroutine is marked async def, making it a coroutine. Inside, await asyncio.sleep(0.5) pauses my_simple_coroutine for half a second. During this pause, the asyncio event loop is free to run other tasks (though there are none in this simple example). When the sleep is over, my_simple_coroutine resumes. This illustrates how await yields control.
Example 2: async
and await
with Return Values
import asyncio
async def fetch_user_data(user_id):
"""
Simulates fetching user data from a database or API asynchronously.
Returns the fetched data.
"""
print(f"Fetching data for user {user_id}...")
await asyncio.sleep(1) # Simulate network/database latency
data = {"id": user_id, "name": f"User {user_id}_Name", "email": f"user{user_id}@example.com"}
print(f"Finished fetching data for user {user_id}.")
return data
async def process_user_data(user_data):
"""
Simulates processing user data asynchronously.
"""
print(f"Processing data for {user_data['name']}...")
await asyncio.sleep(0.7) # Simulate CPU-bound processing
processed_info = f"Processed {user_data['name']} - Email: {user_data['email']}"
print(f"Finished processing data for {user_data['name']}.")
return processed_info
async def main_return_values():
"""
Demonstrates awaiting multiple coroutines and using their return values.
"""
print("Starting async/await with return values example.")
user_1_data = await fetch_user_data(1)
user_2_data = await fetch_user_data(2)
processed_1 = await process_user_data(user_1_data)
processed_2 = await process_user_data(user_2_data)
print("\n--- Results ---")
print(processed_1)
print(processed_2)
print("Async/await with return values example finished.")
if __name__ == "__main__":
asyncio.run(main_return_values())
Explanation:
This example demonstrates that coroutines defined with async def can return values, just like regular functions. The fetch_user_data coroutine fetches data, and its result is then awaited and assigned to user_1_data. This data is then passed to process_user_data, which is also awaited. This highlights how the flow of data works within an asynchronous program, allowing sequential operations that still benefit from awaiting non-blocking tasks.
Example 3: Advanced async
and await
with asyncio.create_task
for Concurrent Execution
import asyncio
import time
async def worker(name, delay):
"""
A worker coroutine that simulates work and reports its completion.
Demonstrates using create_task for concurrent execution.
"""
print(f"{name}: Starting work for {delay} seconds.")
await asyncio.sleep(delay)
print(f"{name}: Finished work.")
return f"{name} completed in {delay}s"
async def main_advanced_async_await():
"""
Uses asyncio.create_task to run multiple worker coroutines concurrently
and then awaits their completion.
"""
print("Starting advanced async/await with create_task example.")
start_time = time.time()
# Create tasks without immediately awaiting them
task1 = asyncio.create_task(worker("Worker A", 3))
task2 = asyncio.create_task(worker("Worker B", 1))
task3 = asyncio.create_task(worker("Worker C", 2))
# Await the completion of all tasks
results = await asyncio.gather(task1, task2, task3)
end_time = time.time()
print(f"\nAll tasks completed. Total time: {end_time - start_time:.2f} seconds.")
for result in results:
print(f"- {result}")
print("Advanced async/await with create_task example finished.")
if __name__ == "__main__":
asyncio.run(main_advanced_async_await())
Explanation:
This advanced example shows how asyncio.create_task is used to schedule coroutines to run concurrently on the event loop. Instead of awaiting each worker directly (which would run them sequentially), create_task immediately schedules the coroutine as a task. The asyncio.gather then efficiently awaits the completion of all these tasks. You'll observe that "Worker A: Starting work...", "Worker B: Starting work...", and "Worker C: Starting work..." print almost at the same time, and the total execution time is roughly the duration of the longest task (3 seconds), demonstrating true concurrency for I/O-bound operations.
Event Loop
The event loop is the heart of every asyncio
application and the central orchestrator of asynchronous programming in Python. Conceptually, it's a perpetual loop that monitors for events (like a network request completing, data being available to read, or a timer expiring) and dispatches them to the appropriate coroutine. Instead of waiting idly for an I/O operation to finish (which would block the entire program), the event loop allows a coroutine to "yield" control back to it whenever it encounters an await
expression, especially during I/O-bound operations.
While one coroutine is paused, waiting for an event, the event loop can then switch its attention to other coroutines that are ready to run or tasks whose awaited operations have completed. This enables cooperative multitasking, where coroutines voluntarily give up control, ensuring that the single-threaded event loop remains active and responsive. The event loop is responsible for:
Registering and scheduling coroutines (tasks).
Monitoring I/O events (e.g., socket readiness).
Managing timers.
Dispatching callbacks when events occur.
You typically interact with the event loop indirectly through asyncio.run()
, which creates, manages, and closes the event loop for you. For more advanced scenarios, you might get the current event loop (asyncio.get_running_loop()
) to schedule tasks or handle low-level operations.
Example 1: Visualizing Event Loop with Simple Timers
import asyncio
import time
async def task_a():
"""
A simple task that runs quickly.
"""
print(f"Task A started at {time.time():.2f}")
await asyncio.sleep(0.1) # Simulate a small non-blocking operation
print(f"Task A finished at {time.time():.2f}")
async def task_b():
"""
A task that takes a bit longer, showing how the event loop switches.
"""
print(f"Task B started at {time.time():.2f}")
await asyncio.sleep(0.5) # Simulate a longer non-blocking operation
print(f"Task B finished at {time.time():.2f}")
async def main_event_loop_basic():
"""
Runs multiple tasks concurrently to illustrate event loop's behavior.
"""
print("Event Loop Basic Example: Starting tasks...")
# Schedule tasks to run concurrently
await asyncio.gather(task_a(), task_b())
print("Event Loop Basic Example: All tasks completed.")
if __name__ == "__main__":
asyncio.run(main_event_loop_basic())
Explanation:
This example visually demonstrates the event loop's cooperative nature. Both task_a and task_b are started. You'll observe that "Task A started" and "Task B started" print almost simultaneously. Then, after 0.1 seconds, "Task A finished" prints. After another 0.4 seconds (total 0.5 seconds from start), "Task B finished" prints. This is because when task_a hits await asyncio.sleep(0.1), it yields control to the event loop, allowing task_b to start. When task_a's sleep is over, it becomes ready again, and the event loop eventually resumes it. The total execution time is roughly determined by the longest individual task.
Example 2: Intermediate Event Loop - Scheduling Callbacks
import asyncio
import time
def my_callback(message):
"""
A regular function that serves as a callback.
"""
print(f"Callback received: '{message}' at {time.time():.2f}")
async def long_running_task():
"""
A coroutine that simulates a long operation and schedules a callback.
"""
print(f"Long running task started at {time.time():.2f}")
await asyncio.sleep(2) # Simulate a long I/O operation
print(f"Long running task finished at {time.time():.2f}")
return "Data from long task"
async def main_event_loop_callbacks():
"""
Demonstrates how to schedule a callback with the event loop and
how it runs alongside other tasks.
"""
print("Event Loop Callbacks Example: Starting...")
loop = asyncio.get_running_loop()
# Schedule my_callback to run after 1 second
loop.call_later(1, my_callback, "Scheduled message 1")
# Schedule my_callback to run after 2.5 seconds
loop.call_later(2.5, my_callback, "Scheduled message 2")
# Run a long task concurrently
result = await long_running_task()
print(f"Main received: {result}")
print("Event Loop Callbacks Example: All operations initiated.")
# The event loop continues to run until all tasks and scheduled callbacks are done
# asyncio.run handles the loop shutdown.
if __name__ == "__main__":
asyncio.run(main_event_loop_callbacks())
Explanation:
This example highlights how the event loop manages scheduled callbacks in addition to coroutines. loop.call_later() allows you to schedule a regular function (my_callback) to be executed after a specified delay. You'll observe that my_callback prints its messages at approximately 1 and 2.5 seconds, even while long_running_task is actively "awaiting" its sleep. This demonstrates the event loop's ability to interleave different types of events and tasks to maintain responsiveness.
Example 3: Advanced Event Loop - Customizing and Debugging
import asyncio
import time
import logging
# Configure logging for asyncio
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger(__name__)
async def debug_task(name, delay):
"""
A task that includes logging to show event loop behavior.
"""
log.debug(f"{name}: Starting, will sleep for {delay}s")
start = time.time()
await asyncio.sleep(delay)
end = time.time()
log.debug(f"{name}: Finished after {end - start:.2f}s")
return f"{name} done"
async def main_event_loop_advanced():
"""
Demonstrates getting the running loop, setting debug mode,
and running tasks with explicit loop management (though `asyncio.run` is preferred).
"""
log.info("Event Loop Advanced Example: Starting...")
# asyncio.run() sets up and closes the loop, including debug mode
# For demonstration, let's explicitly show getting the loop (though not typically needed with asyncio.run)
loop = asyncio.get_running_loop()
log.info(f"Using event loop: {type(loop).__name__}")
log.debug(f"Event loop debug mode: {loop.get_debug()}")
# We can enable debug mode explicitly if not using asyncio.run
# loop.set_debug(True) # asyncio.run handles this if debug=True is passed
task1 = asyncio.create_task(debug_task("Task X", 1.5))
task2 = asyncio.create_task(debug_task("Task Y", 0.5))
task3 = asyncio.create_task(debug_task("Task Z", 1.0))
results = await asyncio.gather(task1, task2, task3)
log.info("\nAll advanced tasks completed. Results:")
for res in results:
log.info(f"- {res}")
log.info("Event Loop Advanced Example: Finished.")
if __name__ == "__main__":
# asyncio.run(main_event_loop_advanced(), debug=True) # Recommended way to enable debug
# For demonstration purposes, we run without debug=True in asyncio.run,
# as the logging config already provides DEBUG output
asyncio.run(main_event_loop_advanced())
Explanation:
This advanced example delves into interacting with the event loop directly, specifically by enabling debug logging. While asyncio.run() is generally sufficient and handles loop creation/management, understanding asyncio.get_running_loop() is important for more complex scenarios, such as when integrating with existing event-driven frameworks or for detailed debugging. By configuring Python's logging module, you can observe the internal workings of asyncio and the event loop, seeing when tasks are scheduled, when they yield control, and when they resume. This provides deeper insight into how the event loop orchestrates concurrent execution.
Coroutines
In Python, a coroutine is a special type of function that can be paused and resumed. They are the fundamental unit of concurrency in asyncio
. Unlike regular functions, which run to completion once called, coroutines can suspend their execution at await
expressions, returning control to the event loop. When the awaited operation (e.g., a network request) completes, the event loop can then resume the coroutine from where it left off.
Coroutines are defined using async def
. When you call an async def
function, it doesn't immediately execute its code. Instead, it returns a coroutine object. To actually run the coroutine, you must "await" it or schedule it as an asyncio.Task
on the event loop. This cooperative multitasking model means that coroutines explicitly decide when to yield control, which provides more control over concurrency compared to preemptive multithreading. Coroutines are "awaitable" objects, meaning they can be passed to the await
keyword.
Example 1: Defining and Awaiting a Simple Coroutine
import asyncio
async def simple_message():
"""
A very basic coroutine that prints a message after a short delay.
"""
print("Coroutine: Starting message...")
await asyncio.sleep(0.3) # Pause the coroutine for 0.3 seconds
print("Coroutine: Message displayed!")
async def main_simple_coroutine():
"""
The entry point to run our simple coroutine.
"""
print("Main: Calling simple_message coroutine.")
await simple_message() # This awaits the coroutine object
print("Main: simple_message coroutine completed.")
if __name__ == "__main__":
asyncio.run(main_simple_coroutine())
Explanation:
This example illustrates the simplest form of a coroutine. simple_message is defined with async def, making it a coroutine function. When simple_message() is called within main_simple_coroutine, it returns a coroutine object. The await keyword then tells the event loop to run simple_message until it encounters an await expression (in this case, await asyncio.sleep(0.3)). During that sleep, control returns to the event loop, which can then do other things. Once the sleep is over, simple_message resumes.
Example 2: Coroutines with Parameters and Return Values
import asyncio
async def calculate_square(number):
"""
A coroutine that calculates the square of a number after a delay.
It takes a parameter and returns a value.
"""
print(f"Coroutine: Calculating square of {number}...")
await asyncio.sleep(0.1) # Simulate a small computation delay
result = number * number
print(f"Coroutine: Finished calculating square of {number}.")
return result
async def main_coroutine_params():
"""
Demonstrates calling coroutines with parameters and handling their return values.
"""
print("Main: Starting coroutine with parameters example.")
square_of_5 = await calculate_square(5)
print(f"Main: Square of 5 is: {square_of_5}")
square_of_10 = await calculate_square(10)
print(f"Main: Square of 10 is: {square_of_10}")
print("Main: Coroutine with parameters example completed.")
if __name__ == "__main__":
asyncio.run(main_coroutine_params())
Explanation:
This example shows that coroutines are just like regular functions in that they can accept arguments and return values. The calculate_square coroutine takes number as input and returns its square. The await keyword effectively "unpacks" the result of the coroutine once it completes. This is a fundamental aspect of building more complex asynchronous logic where data needs to flow between different asynchronous operations.
Example 3: Chaining Coroutines and Task Management
import asyncio
async def download_file(filename, size_mb):
"""
Simulates downloading a file, returning its name and size.
"""
print(f"Downloading '{filename}' ({size_mb} MB)...")
await asyncio.sleep(size_mb / 5) # Simulate download time
print(f"Finished downloading '{filename}'.")
return {"filename": filename, "size_mb": size_mb, "status": "Downloaded"}
async def process_file(file_info):
"""
Simulates processing a downloaded file.
"""
filename = file_info["filename"]
print(f"Processing '{filename}'...")
await asyncio.sleep(file_info["size_mb"] / 10) # Simulate processing time
processed_status = f"Processed {filename}, size {file_info['size_mb']}MB."
print(f"Finished processing '{filename}'.")
return processed_status
async def chain_operations(file_name, file_size):
"""
Chains download and process coroutines.
"""
print(f"\n--- Initiating chain for {file_name} ---")
download_result = await download_file(file_name, file_size)
process_result = await process_file(download_result)
print(f"--- Chain for {file_name} complete. Result: {process_result} ---")
return process_result
async def main_chaining_coroutines():
"""
Demonstrates running multiple chained coroutine operations concurrently
using asyncio.gather.
"""
print("Main: Starting chaining coroutines example.")
tasks = [
asyncio.create_task(chain_operations("report.pdf", 2)),
asyncio.create_task(chain_operations("image.jpg", 1)),
asyncio.create_task(chain_operations("video.mp4", 5))
]
all_results = await asyncio.gather(*tasks)
print("\nMain: All chained operations finished. Final results:")
for res in all_results:
print(f"- {res}")
print("Main: Chaining coroutines example completed.")
if __name__ == "__main__":
asyncio.run(main_chaining_coroutines())
Explanation:
This advanced example demonstrates chaining coroutines and managing multiple such chains concurrently. download_file and process_file are independent coroutines. chain_operations orchestrates them sequentially for a single file. Crucially, main_chaining_coroutines uses asyncio.create_task to turn each chain_operations call into an asyncio.Task, allowing all three chains to run concurrently on the event loop. asyncio.gather then waits for all of them to complete. This pattern is very common in real-world asyncio applications, where you might have multiple independent workflows that need to execute concurrently.
Non-blocking I/O
Non-blocking I/O (Input/Output) is a critical concept in asynchronous programming and the core problem that asyncio
aims to solve efficiently. In traditional, "blocking" I/O, when a program performs an operation like reading from a file, making a network request, or querying a database, the program's execution pauses completely until that I/O operation is finished. During this waiting period, the CPU is often idle, and the program cannot perform any other useful work, leading to inefficiency and unresponsiveness, especially in applications handling many concurrent connections.
Non-blocking I/O, on the other hand, allows an application to initiate an I/O operation and immediately continue with other tasks without waiting for the operation to complete. When the I/O operation eventually finishes (e.g., data arrives from the network), the operating system notifies the application (typically via an event loop). The asyncio
module provides the tools (async
and await
, combined with event loop management) to write Python code that leverages non-blocking I/O effectively. This enables a single thread to handle thousands of concurrent connections or operations, making it highly suitable for high-performance network services, web servers, and data processing pipelines.
Example 1: Simulating Non-blocking Network Request
import asyncio
import time
async def fetch_webpage(url):
"""
Simulates fetching a webpage using non-blocking I/O.
"""
start_time = time.time()
print(f"Initiating fetch for {url}...")
# In a real scenario, this would be an actual network request using aiohttp or similar
await asyncio.sleep(1.5) # Simulate network delay
end_time = time.time()
print(f"Finished fetching {url} in {end_time - start_time:.2f} seconds.")
return f"Content from {url}"
async def main_non_blocking_io_basic():
"""
Runs multiple simulated non-blocking I/O tasks concurrently.
"""
print("Non-blocking I/O Basic Example: Starting...")
urls = [
"http://example.com/page1",
"http://example.com/page2",
"http://example.com/page3"
]
# Fetch all pages concurrently
results = await asyncio.gather(*[fetch_webpage(url) for url in urls])
print("\nNon-blocking I/O Basic Example: All fetches complete. Results:")
for res in results:
print(f"- {res}")
print("Non-blocking I/O Basic Example: Finished.")
if __name__ == "__main__":
asyncio.run(main_non_blocking_io_basic())
Explanation:
This example clearly demonstrates the power of non-blocking I/O. If fetch_webpage were a blocking function, fetching three pages with a 1.5-second delay each would take approximately 4.5 seconds. However, because await asyncio.sleep(1.5) yields control, all three fetch_webpage coroutines start almost simultaneously. The total execution time for main_non_blocking_io_basic will be around 1.5 seconds (the duration of the longest "I/O" operation), showing how non-blocking I/O allows concurrent progress without waiting for each operation to complete sequentially.
Example 2: Intermediate Non-blocking I/O - File Operations (Asyncio to_thread
)
import asyncio
import time
import os
# Create a dummy file for demonstration
dummy_file_path = "large_data.txt"
with open(dummy_file_path, "w") as f:
f.write("This is a line of text.\n" * 100000) # Create a reasonably large file
async def read_large_file_blocking(filepath):
"""
Simulates reading a large file using blocking I/O (but wrapped in run_in_executor
to run in a separate thread, mimicking non-blocking for the event loop).
"""
print(f"Reading '{filepath}' (blocking simulation starts)...")
loop = asyncio.get_running_loop()
# Use run_in_executor to run blocking file I/O in a separate thread pool
# This makes the event loop non-blocked, even if the underlying operation is blocking
content = await loop.run_in_executor(None, open, filepath, 'r') # open file blocking
lines_read = 0
for line in content: # Blocking read of lines
lines_read += 1
# Simulate processing each line
# await asyncio.sleep(0.00001) # Small pause to allow context switching if needed
content.close()
print(f"Finished reading '{filepath}' ({lines_read} lines read).")
return lines_read
async def main_non_blocking_io_file():
"""
Demonstrates running a "blocking" file operation in a non-blocking manner
for the event loop using loop.run_in_executor (or asyncio.to_thread).
"""
print("Non-blocking I/O File Example: Starting...")
start_time = time.time()
# Simulate another concurrent task that doesn't involve file I/O
async def background_task():
print("Background task: Doing some light work...")
await asyncio.sleep(0.5)
print("Background task: Finished light work.")
file_task = asyncio.create_task(read_large_file_blocking(dummy_file_path))
bg_task = asyncio.create_task(background_task())
await asyncio.gather(file_task, bg_task)
end_time = time.time()
print(f"Non-blocking I/O File Example: All tasks complete in {end_time - start_time:.2f} seconds.")
# Clean up the dummy file
os.remove(dummy_file_path)
if __name__ == "__main__":
asyncio.run(main_non_blocking_io_file())
Explanation:
This intermediate example addresses a common misconception: asyncio directly makes some I/O operations non-blocking. However, standard file I/O functions (open, read, write) in Python are inherently blocking. To handle these in an asyncio application without blocking the event loop, you use loop.run_in_executor (or the more convenient asyncio.to_thread in Python 3.9+). This offloads the blocking operation to a separate thread pool, allowing the main event loop to continue processing other asynchronous tasks. You'll notice that the "Background task" runs concurrently with the file reading simulation, demonstrating that the event loop itself remains non-blocked.
Example 3: Advanced Non-blocking I/O - Asynchronous HTTP Requests with aiohttp
import asyncio
import time
import aiohttp # pip install aiohttp
async def fetch_url_real(session, url):
"""
Performs an actual asynchronous HTTP GET request using aiohttp.
"""
start_time = time.time()
print(f"Fetching {url} (real request)...")
try:
async with session.get(url) as response:
response.raise_for_status() # Raise an exception for bad status codes
text = await response.text() # Await to get the response body as text
end_time = time.time()
print(f"Finished fetching {url} (status: {response.status}) in {end_time - start_time:.2f} seconds. Content length: {len(text)} chars.")
return url, len(text)
except aiohttp.ClientError as e:
print(f"Error fetching {url}: {e}")
return url, None
async def main_non_blocking_io_real_http():
"""
Demonstrates true non-blocking network I/O by making multiple
concurrent HTTP requests using aiohttp.
"""
print("Non-blocking I/O Real HTTP Example: Starting...")
start_overall_time = time.time()
urls = [
"https://www.python.org/",
"https://docs.python.org/3/library/asyncio.html",
"https://www.google.com/"
]
async with aiohttp.ClientSession() as session:
tasks = [asyncio.create_task(fetch_url_real(session, url)) for url in urls]
results = await asyncio.gather(*tasks)
end_overall_time = time.time()
print(f"\nNon-blocking I/O Real HTTP Example: All requests completed in {end_overall_time - start_overall_time:.2f} seconds.")
print("Results:")
for url, length in results:
print(f"- {url}: {'Success, content length ' + str(length) if length is not None else 'Failed'}")
print("Non-blocking I/O Real HTTP Example: Finished.")
if __name__ == "__main__":
try:
asyncio.run(main_non_blocking_io_real_http())
except ImportError:
print("\n'aiohttp' library not found. Please install it using: pip install aiohttp")
print("This example demonstrates real non-blocking HTTP requests.")
Explanation:
This advanced example uses aiohttp, a popular third-party library designed for asynchronous HTTP requests. This is a prime example of real-world non-blocking I/O. Instead of simulating delays, aiohttp's session.get() and response.text() methods are intrinsically asynchronous and awaitable. When await response.text() is called, the coroutine yields control while it waits for the entire response body to be received from the network. This allows the event loop to seamlessly switch to other fetch_url_real tasks, resulting in all URLs being fetched concurrently. You'll observe that the total time taken is dictated by the slowest network request, not the sum of all request times, showcasing the significant performance benefits of non-blocking I/O for network-bound applications.