The multiprocessing
module in Python is a built-in library that supports spawning processes using an API similar to the threading
module. It allows you to run multiple operations concurrently, leveraging multiple CPU cores. This is crucial for CPU-bound tasks where traditional threading might be limited by the Global Interpreter Lock (GIL). When you need true parallelism in Python, the multiprocessing
module is your go-to solution. It enables you to bypass the GIL by running each process in its own Python interpreter, effectively utilizing all available processor cores for enhanced performance and faster execution.
Example 1: Basic Multiprocessing - Hello World
import multiprocessing
import os
def greet_process():
"""
A simple function to be executed by a new process.
It prints a greeting and its process ID.
"""
print(f"Hello from process: {os.getpid()}") # os.getpid() gets the current process ID
if __name__ == "__main__":
print(f"Main process ID: {os.getpid()}") # Show the main process ID
# Create a Process object, targeting the greet_process function
my_process = multiprocessing.Process(target=greet_process)
# Start the process, which will execute greet_process
my_process.start()
# Wait for the process to complete its execution
my_process.join()
print("Process finished.")
Explanation: This beginner-friendly example demonstrates the fundamental use of the multiprocessing.Process
class. We define a simple function greet_process
that prints a message and its process ID. In the if __name__ == "__main__":
block (essential for Windows compatibility when using multiprocessing), we create an instance of multiprocessing.Process
, specifying our greet_process
function as the target
. Calling start()
kicks off the new process, and join()
ensures the main program waits for the child process to complete before continuing. This is your first step into understanding how to create and run multiple processes in Python.
Example 2: Multiprocessing with Arguments
import multiprocessing
import os
import time
def worker_function(name, delay):
"""
A worker function that takes arguments and simulates some work.
"""
print(f"Process {name} (ID: {os.getpid()}) starting work for {delay} seconds...")
time.sleep(delay) # Simulate a time-consuming operation
print(f"Process {name} (ID: {os.getpid()}) finished work.")
if __name__ == "__main__":
processes = []
# Create multiple processes with different arguments
p1 = multiprocessing.Process(target=worker_function, args=("Worker 1", 2))
p2 = multiprocessing.Process(target=worker_function, args=("Worker 2", 1))
p3 = multiprocessing.Process(target=worker_function, args=("Worker 3", 3))
processes.append(p1)
processes.append(p2)
processes.append(p3)
for p in processes:
p.start() # Start each process
for p in processes:
p.join() # Wait for all processes to complete
print("All worker processes have completed.")
Explanation: This example builds on the first by showing how to pass arguments to your target function. We define worker_function
which accepts a name
and a delay
. When creating multiprocessing.Process
instances, we use the args
tuple to provide these values. This is crucial for making your processes perform specific, parameterized tasks. It's a common pattern when you need to distribute tasks with varying inputs across multiple processes.
Example 3: Using a Process Pool for Concurrent Execution
import multiprocessing
import os
import time
def calculate_square(number):
"""
A function to calculate the square of a number, simulating a CPU-bound task.
"""
print(f"Calculating square for {number} in process {os.getpid()}...")
time.sleep(0.5) # Simulate computation
return number * number
if __name__ == "__main__":
numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
num_processes = multiprocessing.cpu_count() # Use as many processes as CPU cores
print(f"Starting a pool with {num_processes} processes...")
# Create a Pool of worker processes
with multiprocessing.Pool(processes=num_processes) as pool:
# Map the calculate_square function to each number in the list
# This distributes the tasks across the pool's processes
results = pool.map(calculate_square, numbers)
print(f"Results of square calculations: {results}")
print("Pool operations completed.")
Explanation: For more complex scenarios involving distributing a large number of similar tasks, multiprocessing.Pool
is incredibly efficient. Instead of manually creating and managing individual Process
objects, Pool
creates a set of worker processes that automatically handle the distribution and execution of tasks. pool.map()
is particularly useful for applying a function to each item in an iterable, effectively parallelizing the operation. This is a powerful technique for parallel processing and speeding up computations on multi-core systems.
Example 4: Handling Exceptions in Multiprocessing
import multiprocessing
import os
import time
def divide_numbers(numerator, denominator):
"""
A function that attempts division and might raise an exception.
"""
try:
print(f"Process {os.getpid()}: Attempting to divide {numerator} by {denominator}")
result = numerator / denominator
print(f"Process {os.getpid()}: Result = {result}")
return result
except ZeroDivisionError:
print(f"Process {os.getpid()}: Caught ZeroDivisionError!")
# In a real application, you might log this or return a specific error value
return "Error: Division by zero"
except Exception as e:
print(f"Process {os.getpid()}: An unexpected error occurred: {e}")
return f"Error: {e}"
if __name__ == "__main__":
tasks = [(10, 2), (5, 0), (20, 4), (100, 10)]
with multiprocessing.Pool(processes=2) as pool:
# Use apply_async to get AsyncResult objects, allowing individual error handling
async_results = [pool.apply_async(divide_numbers, args=(n, d)) for n, d in tasks]
for i, ar in enumerate(async_results):
try:
# .get() will retrieve the result or re-raise any exception from the worker process
result = ar.get(timeout=5) # Optional: add a timeout
print(f"Task {i} result: {result}")
except multiprocessing.TimeoutError:
print(f"Task {i} timed out!")
except Exception as e:
# This catches exceptions from the main process if .get() re-raises
print(f"Error retrieving result for task {i}: {e}")
print("All division tasks attempted.")
Explanation: In production environments, robust error handling in concurrent programs is vital. This example demonstrates how to gracefully handle exceptions that might occur within your worker processes. We use pool.apply_async()
which returns an AsyncResult
object. Calling .get()
on this object will either return the function's result or re-raise any exception that occurred in the child process, allowing the main process to catch and manage it. This is a more advanced technique for managing process failures and ensuring program stability.
Example 5: Daemon Processes and Termination
import multiprocessing
import time
import os
def daemon_worker():
"""
A simple worker function designed to run indefinitely.
"""
name = multiprocessing.current_process().name
print(f"Daemon process {name} (ID: {os.getpid()}) starting...")
try:
while True:
print(f"Daemon process {name} is alive...")
time.sleep(1)
except KeyboardInterrupt:
print(f"Daemon process {name} received KeyboardInterrupt, exiting.")
finally:
print(f"Daemon process {name} exiting.")
if __name__ == "__main__":
print(f"Main process (ID: {os.getpid()}) starting...")
# Create a daemon process
# Daemon processes are automatically terminated when the main process exits.
daemon_p = multiprocessing.Process(target=daemon_worker, name="DaemonProcess", daemon=True)
# Start the daemon process
daemon_p.start()
print("Daemon process started. Main process will sleep for 3 seconds...")
time.sleep(3) # Main process does some work
print("Main process finished its work. Daemon process will be terminated automatically.")
# No daemon_p.join() needed for daemon processes as they are non-blocking and terminated on exit.
print("Main process exiting.")
Explanation: Daemon processes are background processes that are automatically terminated when the main program exits. They are non-blocking, meaning the main process doesn't wait for them to finish (unless join()
is explicitly called). This is useful for background tasks or services that don't need to complete before the main program can shut down. Setting daemon=True
when creating a Process
marks it as a daemon. This example showcases how to create and manage background processes that gracefully shut down with your main application.
Creating and managing processes
Creating and managing processes in Python involves initiating new execution flows that run independently of the main program. Each process has its own memory space, which provides isolation and prevents data corruption between concurrent operations. The multiprocessing
module offers robust tools for starting, monitoring, and terminating these processes, giving you fine-grained control over your concurrent applications. Mastering these techniques is fundamental for building scalable and efficient Python applications that leverage multi-core CPUs.
Example 1: Basic Process Creation and Start
import multiprocessing
import os
import time
def simple_task(task_id):
"""
A function representing a simple, independent task.
"""
print(f"Process {os.getpid()} (Task {task_id}) starting...")
time.sleep(0.5) # Simulate work
print(f"Process {os.getpid()} (Task {task_id}) finished.")
if __name__ == "__main__":
print(f"Main process ID: {os.getpid()}")
# Create the first process
p1 = multiprocessing.Process(target=simple_task, args=(1,))
# Create the second process
p2 = multiprocessing.Process(target=simple_task, args=(2,))
# Start the processes
print("Starting processes...")
p1.start()
p2.start()
# The main process continues execution immediately
print("Main process is still running while child processes execute.")
# Wait for both processes to complete
p1.join()
p2.join()
print("All processes have finished. Main process exiting.")
Explanation: This example demonstrates the most basic form of process creation
and management
. We define simple_task
as the function to be executed by the new processes. We then create two multiprocessing.Process
objects, assign the target function and arguments, and call .start()
on each. Crucially, .start()
initiates the new process, and the main process continues without waiting. .join()
is then used to wait for process completion, ensuring all child processes have finished before the main program proceeds. This is a core concept for running functions in separate processes.
Example 2: Checking Process Status and Termination
import multiprocessing
import time
import os
def long_running_task():
"""
A task that runs for a long time, allowing us to observe its status.
"""
print(f"Long-running process {os.getpid()} starting...")
try:
for i in range(10):
print(f"Long-running process {os.getpid()} working... {i}/9")
time.sleep(1)
except KeyboardInterrupt:
print(f"Long-running process {os.getpid()} received KeyboardInterrupt.")
finally:
print(f"Long-running process {os.getpid()} exiting.")
if __name__ == "__main__":
p = multiprocessing.Process(target=long_running_task)
p.start()
# Check if the process is alive
print(f"Is process alive after start? {p.is_alive()}")
time.sleep(2.5) # Let it run for a bit
# Terminate the process prematurely
print("Terminating the long-running process...")
p.terminate()
# Give it a moment to terminate
time.sleep(0.1)
print(f"Is process alive after terminate? {p.is_alive()}")
print(f"Process exit code: {p.exitcode}") # Check exit code
p.join() # It's good practice to join even after terminate
print("Process joined after termination.")
Explanation: This example shows how to manage process lifecycle
beyond just starting and joining. We use p.is_alive()
to check if a process is still running, which is useful for monitoring. The most significant part is p.terminate()
, which allows you to forcibly stop a process. When a process is terminated, it sets an exitcode
. It's important to remember that terminate()
might not allow the child process to clean up resources, so use it carefully. This demonstrates programmatic process control and interruption.
Example 3: Process Naming and Current Process Information
import multiprocessing
import os
import time
def process_info_task():
"""
A function to demonstrate retrieving process-specific information.
"""
current_process = multiprocessing.current_process()
print(f"Process Name: {current_process.name}")
print(f"Process ID: {current_process.pid}")
print(f"Is Daemon: {current_process.daemon}")
print(f"Is Alive: {current_process.is_alive()}")
print(f"Parent Process ID: {os.getppid()}") # Get parent process ID
time.sleep(1)
print(f"Process {current_process.name} finished.")
if __name__ == "__main__":
print(f"Main Process ID: {os.getpid()}")
print(f"Main Process Name: {multiprocessing.current_process().name}")
# Create processes with custom names
p1 = multiprocessing.Process(target=process_info_task, name="Worker-Alpha")
p2 = multiprocessing.Process(target=process_info_task, name="Worker-Beta", daemon=True)
p1.start()
p2.start()
p1.join()
# No join for daemon p2, it will exit with the main process
print("Main process completed.")
Explanation: This example focuses on process identification and introspection
. You can assign a name
to a Process
object for easier debugging and logging. multiprocessing.current_process()
provides access to the current process's attributes, such as its name
, pid
(process ID), and whether it's a daemon
. os.getppid()
is also useful for determining the parent process ID. This helps in organizing and debugging your multiprocessing applications.
Example 4: Using start_method
for Process Creation
import multiprocessing
import os
import time
def process_worker():
"""
A simple worker function.
"""
print(f"Worker process {os.getpid()} started using '{multiprocessing.get_start_method()}' method.")
time.sleep(0.5)
print(f"Worker process {os.getpid()} finished.")
if __name__ == "__main__":
# Get the default start method for the current OS
default_method = multiprocessing.get_start_method(allow_none=False)
print(f"Default start method: {default_method}")
# You can set the start method before creating any processes
# 'spawn' is safer on Windows/macOS but slower. 'fork' (default on Unix) is faster.
# 'forkserver' is another option.
try:
multiprocessing.set_start_method('spawn', force=True)
print(f"Start method set to: {multiprocessing.get_start_method()}")
except RuntimeError as e:
print(f"Could not set start method to 'spawn': {e}")
print("Continuing with default method.")
# Create and start a process using the (potentially new) start method
p = multiprocessing.Process(target=process_worker)
p.start()
p.join()
print("Process started and joined using the configured method.")
# Resetting the start method (optional)
# multiprocessing.set_start_method(default_method, force=True)
Explanation: Python's multiprocessing
module offers different start methods
for creating new processes: 'fork'
, 'spawn'
, and 'forkserver'
. The default method varies by operating system ('fork'
on Unix-like, 'spawn'
on Windows/macOS). 'spawn'
is generally safer as it creates a fresh interpreter process, avoiding issues with inherited resources, though it can be slower. This example shows how to explicitly set the start method
using multiprocessing.set_start_method()
. This is an advanced process configuration
technique important for cross-platform compatibility and avoiding subtle bugs in complex multiprocessing applications.
Example 5: Using Process
for Asynchronous Task Submission (without Pool)
import multiprocessing
import os
import time
import queue # For advanced result collection if not using Pool directly
def async_task_worker(task_id, result_queue):
"""
A worker function that performs a task and puts its result into a Queue.
"""
print(f"Task {task_id} (Process {os.getpid()}) starting...")
time.sleep(1) # Simulate work
result = f"Result from Task {task_id}"
result_queue.put((task_id, result)) # Put result into the queue
print(f"Task {task_id} finished.")
if __name__ == "__main__":
# A Queue is a good way to collect results from multiple processes
results_queue = multiprocessing.Queue()
processes = []
num_tasks = 5
print("Starting individual processes asynchronously...")
for i in range(num_tasks):
p = multiprocessing.Process(target=async_task_worker, args=(i, results_queue))
processes.append(p)
p.start() # Start without joining immediately
# Main process can do other work while child processes run
print("Main process doing other things...")
time.sleep(0.5)
# Wait for all processes to complete
for p in processes:
p.join()
# Collect results from the queue
print("\nCollecting results:")
collected_results = {}
while not results_queue.empty():
task_id, result = results_queue.get()
collected_results[task_id] = result
print(f"Collected results: {collected_results}")
print("All tasks and result collection completed.")
Explanation: While multiprocessing.Pool
is great for mapping tasks
, sometimes you need finer control over individual asynchronous processes
. This example demonstrates how to start multiple processes without immediately waiting for them (.join()
). Instead, we use a multiprocessing.Queue
to collect results from the processes as they complete their work. This pattern is useful for asynchronous task processing
where the main process needs to stay responsive while tasks run in the background. It showcases how to manage independent processes
and gather their outputs effectively.
Inter-process communication (Queues, Pipes)
Inter-process communication (IPC) is vital when you have multiple processes running concurrently and need them to share data or synchronize their operations. Since each process has its own memory space, direct variable sharing isn't possible. Python's multiprocessing
module provides powerful IPC mechanisms like Queues
and Pipes
to enable safe and efficient data exchange between independent processes. Understanding these tools is key to building complex, cooperative multiprocessing applications
that truly harness the power of parallelism.
Example 1: Basic Inter-process Communication with Queue
import multiprocessing
import os
import time
def producer(q):
"""
A process that produces data and puts it into a Queue.
"""
print(f"Producer process {os.getpid()} starting...")
for i in range(5):
item = f"Item {i}"
print(f"Producer putting: {item}")
q.put(item) # Put data into the queue
time.sleep(0.5)
q.put(None) # Signal that there's no more data
print(f"Producer process {os.getpid()} finished.")
def consumer(q):
"""
A process that consumes data from a Queue.
"""
print(f"Consumer process {os.getpid()} starting...")
while True:
item = q.get() # Get data from the queue
if item is None:
break # Exit loop if a None sentinel is received
print(f"Consumer got: {item}")
time.sleep(0.7)
print(f"Consumer process {os.getpid()} finished.")
if __name__ == "__main__":
# Create a multiprocessing Queue
my_queue = multiprocessing.Queue()
# Create producer and consumer processes
p = multiprocessing.Process(target=producer, args=(my_queue,))
c = multiprocessing.Process(target=consumer, args=(my_queue,))
# Start both processes
p.start()
c.start()
# Wait for both processes to complete
p.join()
c.join()
print("Producer and Consumer processes finished.")
Explanation: multiprocessing.Queue
is a thread-safe and process-safe
FIFO (First-In, First-Out) data structure ideal for sharing data between processes
. In this producer-consumer example
, the producer
process puts items into the Queue
using q.put()
, and the consumer
process retrieves them using q.get()
. The None
sentinel is a common pattern to signal the consumer that no more data will be produced. This is a fundamental way to achieve safe data exchange between Python processes
.
Example 2: Using Pipes for Two-Way Communication
import multiprocessing
import os
import time
def child_process_pipe(conn):
"""
A child process that communicates via a Pipe connection.
"""
print(f"Child process {os.getpid()} starting with Pipe.")
conn.send("Hello from child!") # Send a message to the parent
time.sleep(1)
received_message = conn.recv() # Receive a message from the parent
print(f"Child process {os.getpid()} received: {received_message}")
conn.close() # Close the connection when done
print(f"Child process {os.getpid()} finished.")
if __name__ == "__main__":
# Create a Pipe: returns two connection objects (parent_conn, child_conn)
parent_conn, child_conn = multiprocessing.Pipe()
# Create a process, passing one end of the pipe to it
p = multiprocessing.Process(target=child_process_pipe, args=(child_conn,))
# Start the child process
p.start()
# Parent process uses its end of the pipe
print(f"Parent process {os.getpid()} starting with Pipe.")
message_from_child = parent_conn.recv() # Receive message from child
print(f"Parent process {os.getpid()} received: {message_from_child}")
time.sleep(0.5)
parent_conn.send("Hi from parent!") # Send message to child
# Close the parent's end of the connection
parent_conn.close()
# Wait for the child process to complete
p.join()
print("Parent and Child processes finished Pipe communication.")
Explanation: multiprocessing.Pipe
creates a duplex (two-way) communication channel
between two processes. It returns two connection objects, each representing one end of the pipe. One end is typically passed to the child process, while the parent holds the other. conn.send()
sends data, and conn.recv()
receives it. Pipes
are often preferred for two-way communication between exactly two processes
, providing a direct and efficient channel. This demonstrates bidirectional IPC
.
Example 3: Using Manager for Shared Data Structures (more complex IPC)
import multiprocessing
import os
import time
def modify_list_process(manager_list, process_id):
"""
A process that modifies a shared list managed by a Manager.
"""
print(f"Process {process_id} (ID: {os.getpid()}) starting to modify list.")
for i in range(3):
# Accessing and modifying the shared list
manager_list.append(f"P{process_id}-Item{i}")
print(f"Process {process_id} added: P{process_id}-Item{i}. Current list: {list(manager_list)}")
time.sleep(0.2)
print(f"Process {process_id} finished modifying list.")
if __name__ == "__main__":
# Create a Manager object
# Manager provides a way to create data that can be shared between different processes.
with multiprocessing.Manager() as manager:
# Create a shared list using the manager
shared_list = manager.list() # This is a special list that can be shared
processes = []
for i in range(3):
p = multiprocessing.Process(target=modify_list_process, args=(shared_list, i + 1))
processes.append(p)
p.start()
for p in processes:
p.join()
print("\nAll processes finished modifying the shared list.")
# The main process can now access the final state of the shared list
print(f"Final shared list from main process: {list(shared_list)}")
Explanation: When you need to share more complex data structures like lists, dictionaries, or custom objects
across multiple processes, multiprocessing.Manager
comes to the rescue. A Manager
provides a way to create shared objects
that can be accessed and modified by different processes, with the manager handling the necessary synchronization. This example uses manager.list()
to create a shared list
. This is crucial for complex data sharing scenarios
where Queues
or Pipes
might be too restrictive. It's a key tool for managing shared state across processes
.
Example 4: Using Event for Process Synchronization
import multiprocessing
import time
import os
def worker_with_event(event, process_id):
"""
A worker process that waits for an Event to be set.
"""
print(f"Worker {process_id} (ID: {os.getpid()}) waiting for event...")
event.wait() # Block until the event is set
print(f"Worker {process_id} (ID: {os.getpid()}) Event received! Starting work.")
time.sleep(1)
print(f"Worker {process_id} (ID: {os.getpid()}) finished work.")
if __name__ == "__main__":
# Create a multiprocessing Event
event = multiprocessing.Event()
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker_with_event, args=(event, i + 1))
processes.append(p)
p.start()
print("Main process is doing other setup...")
time.sleep(2) # Simulate some setup time
print("Main process setting the event to release workers...")
event.set() # Set the event, unblocking all waiting processes
# Wait for all worker processes to complete
for p in processes:
p.join()
print("All workers have completed their tasks after the event was set.")
Explanation: multiprocessing.Event
is a simple yet powerful synchronization primitive
used to coordinate the activities of multiple processes
. One process can set()
the event, and other processes can wait()
for it. When the event is set, all processes currently waiting on it are unblocked. This example demonstrates how to synchronize the start of multiple worker processes
, ensuring they only begin their main task after a specific condition (represented by the event being set) is met. This is fundamental for orchestrating concurrent operations
.
Example 5: Combining Queue and Event for Controlled Task Processing
import multiprocessing
import os
import time
def task_processor(task_queue, result_queue, shutdown_event):
"""
A worker process that processes tasks from a queue until a shutdown event is set.
"""
print(f"Processor {os.getpid()} starting...")
while not shutdown_event.is_set():
try:
task = task_queue.get(timeout=1) # Get task with a timeout
print(f"Processor {os.getpid()} processing task: {task}")
time.sleep(0.5) # Simulate task processing
result_queue.put(f"Processed: {task}")
except Exception: # Catch Empty exception from timeout
if shutdown_event.is_set():
break # Exit if event is set and queue is empty or timed out
print(f"Processor {os.getpid()} shutting down.")
if __name__ == "__main__":
tasks = ["Task A", "Task B", "Task C", "Task D", "Task E"]
num_processors = 2
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
shutdown_event = multiprocessing.Event() # Event to signal graceful shutdown
# Populate the task queue
for task in tasks:
task_queue.put(task)
processors = []
for i in range(num_processors):
p = multiprocessing.Process(
target=task_processor,
args=(task_queue, result_queue, shutdown_event)
)
processors.append(p)
p.start()
# Main process waits for tasks to be processed and collects results
processed_count = 0
while processed_count < len(tasks):
try:
result = result_queue.get(timeout=2)
print(f"Main process received result: {result}")
processed_count += 1
except Exception:
# If no results for a while, check if workers are still alive or tasks left
pass
print("\nAll tasks are processed. Signaling processors to shut down.")
shutdown_event.set() # Set the event to signal workers to exit
# Wait for all processor processes to gracefully shut down
for p in processors:
p.join()
print("All processors have shut down. Main process exiting.")
Explanation: This advanced example showcases a robust task processing pattern
combining Queues
for task distribution
and result collection
with an Event
for graceful process shutdown
. Worker processes continuously get tasks from
task_queue, process them, and put results into
result_queue. The
shutdown_eventacts as a signal for workers to gracefully exit their loops when the main process is done or needs to terminate them. This is a powerful and flexible approach for
building scalable and controllable multi-process task systems`, ensuring both efficient processing and proper termination.