Multiprocessing


The multiprocessing module in Python is a built-in library that supports spawning processes using an API similar to the threading module. It allows you to run multiple operations concurrently, leveraging multiple CPU cores. This is crucial for CPU-bound tasks where traditional threading might be limited by the Global Interpreter Lock (GIL). When you need true parallelism in Python, the multiprocessing module is your go-to solution. It enables you to bypass the GIL by running each process in its own Python interpreter, effectively utilizing all available processor cores for enhanced performance and faster execution.

 

Example 1: Basic Multiprocessing - Hello World

import multiprocessing
import os

def greet_process():
    """
    A simple function to be executed by a new process.
    It prints a greeting and its process ID.
    """
    print(f"Hello from process: {os.getpid()}") # os.getpid() gets the current process ID

if __name__ == "__main__":
    print(f"Main process ID: {os.getpid()}") # Show the main process ID

    # Create a Process object, targeting the greet_process function
    my_process = multiprocessing.Process(target=greet_process)

    # Start the process, which will execute greet_process
    my_process.start()

    # Wait for the process to complete its execution
    my_process.join()

    print("Process finished.")

Explanation: This beginner-friendly example demonstrates the fundamental use of the multiprocessing.Process class. We define a simple function greet_process that prints a message and its process ID. In the if __name__ == "__main__": block (essential for Windows compatibility when using multiprocessing), we create an instance of multiprocessing.Process, specifying our greet_process function as the target. Calling start() kicks off the new process, and join() ensures the main program waits for the child process to complete before continuing. This is your first step into understanding how to create and run multiple processes in Python.

 

Example 2: Multiprocessing with Arguments

import multiprocessing
import os
import time

def worker_function(name, delay):
    """
    A worker function that takes arguments and simulates some work.
    """
    print(f"Process {name} (ID: {os.getpid()}) starting work for {delay} seconds...")
    time.sleep(delay) # Simulate a time-consuming operation
    print(f"Process {name} (ID: {os.getpid()}) finished work.")

if __name__ == "__main__":
    processes = []
    # Create multiple processes with different arguments
    p1 = multiprocessing.Process(target=worker_function, args=("Worker 1", 2))
    p2 = multiprocessing.Process(target=worker_function, args=("Worker 2", 1))
    p3 = multiprocessing.Process(target=worker_function, args=("Worker 3", 3))

    processes.append(p1)
    processes.append(p2)
    processes.append(p3)

    for p in processes:
        p.start() # Start each process

    for p in processes:
        p.join() # Wait for all processes to complete

    print("All worker processes have completed.")

Explanation: This example builds on the first by showing how to pass arguments to your target function. We define worker_function which accepts a name and a delay. When creating multiprocessing.Process instances, we use the args tuple to provide these values. This is crucial for making your processes perform specific, parameterized tasks. It's a common pattern when you need to distribute tasks with varying inputs across multiple processes.

 

Example 3: Using a Process Pool for Concurrent Execution

import multiprocessing
import os
import time

def calculate_square(number):
    """
    A function to calculate the square of a number, simulating a CPU-bound task.
    """
    print(f"Calculating square for {number} in process {os.getpid()}...")
    time.sleep(0.5) # Simulate computation
    return number * number

if __name__ == "__main__":
    numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    num_processes = multiprocessing.cpu_count() # Use as many processes as CPU cores

    print(f"Starting a pool with {num_processes} processes...")
    # Create a Pool of worker processes
    with multiprocessing.Pool(processes=num_processes) as pool:
        # Map the calculate_square function to each number in the list
        # This distributes the tasks across the pool's processes
        results = pool.map(calculate_square, numbers)

    print(f"Results of square calculations: {results}")
    print("Pool operations completed.")

Explanation: For more complex scenarios involving distributing a large number of similar tasks, multiprocessing.Pool is incredibly efficient. Instead of manually creating and managing individual Process objects, Pool creates a set of worker processes that automatically handle the distribution and execution of tasks. pool.map() is particularly useful for applying a function to each item in an iterable, effectively parallelizing the operation. This is a powerful technique for parallel processing and speeding up computations on multi-core systems.

 

Example 4: Handling Exceptions in Multiprocessing

import multiprocessing
import os
import time

def divide_numbers(numerator, denominator):
    """
    A function that attempts division and might raise an exception.
    """
    try:
        print(f"Process {os.getpid()}: Attempting to divide {numerator} by {denominator}")
        result = numerator / denominator
        print(f"Process {os.getpid()}: Result = {result}")
        return result
    except ZeroDivisionError:
        print(f"Process {os.getpid()}: Caught ZeroDivisionError!")
        # In a real application, you might log this or return a specific error value
        return "Error: Division by zero"
    except Exception as e:
        print(f"Process {os.getpid()}: An unexpected error occurred: {e}")
        return f"Error: {e}"

if __name__ == "__main__":
    tasks = [(10, 2), (5, 0), (20, 4), (100, 10)]

    with multiprocessing.Pool(processes=2) as pool:
        # Use apply_async to get AsyncResult objects, allowing individual error handling
        async_results = [pool.apply_async(divide_numbers, args=(n, d)) for n, d in tasks]

        for i, ar in enumerate(async_results):
            try:
                # .get() will retrieve the result or re-raise any exception from the worker process
                result = ar.get(timeout=5) # Optional: add a timeout
                print(f"Task {i} result: {result}")
            except multiprocessing.TimeoutError:
                print(f"Task {i} timed out!")
            except Exception as e:
                # This catches exceptions from the main process if .get() re-raises
                print(f"Error retrieving result for task {i}: {e}")

    print("All division tasks attempted.")

Explanation: In production environments, robust error handling in concurrent programs is vital. This example demonstrates how to gracefully handle exceptions that might occur within your worker processes. We use pool.apply_async() which returns an AsyncResult object. Calling .get() on this object will either return the function's result or re-raise any exception that occurred in the child process, allowing the main process to catch and manage it. This is a more advanced technique for managing process failures and ensuring program stability.

 

Example 5: Daemon Processes and Termination

import multiprocessing
import time
import os

def daemon_worker():
    """
    A simple worker function designed to run indefinitely.
    """
    name = multiprocessing.current_process().name
    print(f"Daemon process {name} (ID: {os.getpid()}) starting...")
    try:
        while True:
            print(f"Daemon process {name} is alive...")
            time.sleep(1)
    except KeyboardInterrupt:
        print(f"Daemon process {name} received KeyboardInterrupt, exiting.")
    finally:
        print(f"Daemon process {name} exiting.")

if __name__ == "__main__":
    print(f"Main process (ID: {os.getpid()}) starting...")

    # Create a daemon process
    # Daemon processes are automatically terminated when the main process exits.
    daemon_p = multiprocessing.Process(target=daemon_worker, name="DaemonProcess", daemon=True)

    # Start the daemon process
    daemon_p.start()

    print("Daemon process started. Main process will sleep for 3 seconds...")
    time.sleep(3) # Main process does some work

    print("Main process finished its work. Daemon process will be terminated automatically.")
    # No daemon_p.join() needed for daemon processes as they are non-blocking and terminated on exit.

    print("Main process exiting.")

Explanation: Daemon processes are background processes that are automatically terminated when the main program exits. They are non-blocking, meaning the main process doesn't wait for them to finish (unless join() is explicitly called). This is useful for background tasks or services that don't need to complete before the main program can shut down. Setting daemon=True when creating a Process marks it as a daemon. This example showcases how to create and manage background processes that gracefully shut down with your main application.


 

Creating and managing processes

Creating and managing processes in Python involves initiating new execution flows that run independently of the main program. Each process has its own memory space, which provides isolation and prevents data corruption between concurrent operations. The multiprocessing module offers robust tools for starting, monitoring, and terminating these processes, giving you fine-grained control over your concurrent applications. Mastering these techniques is fundamental for building scalable and efficient Python applications that leverage multi-core CPUs.

 

Example 1: Basic Process Creation and Start

import multiprocessing
import os
import time

def simple_task(task_id):
    """
    A function representing a simple, independent task.
    """
    print(f"Process {os.getpid()} (Task {task_id}) starting...")
    time.sleep(0.5) # Simulate work
    print(f"Process {os.getpid()} (Task {task_id}) finished.")

if __name__ == "__main__":
    print(f"Main process ID: {os.getpid()}")

    # Create the first process
    p1 = multiprocessing.Process(target=simple_task, args=(1,))

    # Create the second process
    p2 = multiprocessing.Process(target=simple_task, args=(2,))

    # Start the processes
    print("Starting processes...")
    p1.start()
    p2.start()

    # The main process continues execution immediately
    print("Main process is still running while child processes execute.")

    # Wait for both processes to complete
    p1.join()
    p2.join()

    print("All processes have finished. Main process exiting.")

Explanation: This example demonstrates the most basic form of process creation and management. We define simple_task as the function to be executed by the new processes. We then create two multiprocessing.Process objects, assign the target function and arguments, and call .start() on each. Crucially, .start() initiates the new process, and the main process continues without waiting. .join() is then used to wait for process completion, ensuring all child processes have finished before the main program proceeds. This is a core concept for running functions in separate processes.

 

Example 2: Checking Process Status and Termination

import multiprocessing
import time
import os

def long_running_task():
    """
    A task that runs for a long time, allowing us to observe its status.
    """
    print(f"Long-running process {os.getpid()} starting...")
    try:
        for i in range(10):
            print(f"Long-running process {os.getpid()} working... {i}/9")
            time.sleep(1)
    except KeyboardInterrupt:
        print(f"Long-running process {os.getpid()} received KeyboardInterrupt.")
    finally:
        print(f"Long-running process {os.getpid()} exiting.")

if __name__ == "__main__":
    p = multiprocessing.Process(target=long_running_task)
    p.start()

    # Check if the process is alive
    print(f"Is process alive after start? {p.is_alive()}")
    time.sleep(2.5) # Let it run for a bit

    # Terminate the process prematurely
    print("Terminating the long-running process...")
    p.terminate()

    # Give it a moment to terminate
    time.sleep(0.1)

    print(f"Is process alive after terminate? {p.is_alive()}")
    print(f"Process exit code: {p.exitcode}") # Check exit code

    p.join() # It's good practice to join even after terminate
    print("Process joined after termination.")

Explanation: This example shows how to manage process lifecycle beyond just starting and joining. We use p.is_alive() to check if a process is still running, which is useful for monitoring. The most significant part is p.terminate(), which allows you to forcibly stop a process. When a process is terminated, it sets an exitcode. It's important to remember that terminate() might not allow the child process to clean up resources, so use it carefully. This demonstrates programmatic process control and interruption.

 

Example 3: Process Naming and Current Process Information

import multiprocessing
import os
import time

def process_info_task():
    """
    A function to demonstrate retrieving process-specific information.
    """
    current_process = multiprocessing.current_process()
    print(f"Process Name: {current_process.name}")
    print(f"Process ID: {current_process.pid}")
    print(f"Is Daemon: {current_process.daemon}")
    print(f"Is Alive: {current_process.is_alive()}")
    print(f"Parent Process ID: {os.getppid()}") # Get parent process ID
    time.sleep(1)
    print(f"Process {current_process.name} finished.")

if __name__ == "__main__":
    print(f"Main Process ID: {os.getpid()}")
    print(f"Main Process Name: {multiprocessing.current_process().name}")

    # Create processes with custom names
    p1 = multiprocessing.Process(target=process_info_task, name="Worker-Alpha")
    p2 = multiprocessing.Process(target=process_info_task, name="Worker-Beta", daemon=True)

    p1.start()
    p2.start()

    p1.join()
    # No join for daemon p2, it will exit with the main process

    print("Main process completed.")

Explanation: This example focuses on process identification and introspection. You can assign a name to a Process object for easier debugging and logging. multiprocessing.current_process() provides access to the current process's attributes, such as its name, pid (process ID), and whether it's a daemon. os.getppid() is also useful for determining the parent process ID. This helps in organizing and debugging your multiprocessing applications.

 

Example 4: Using start_method for Process Creation

import multiprocessing
import os
import time

def process_worker():
    """
    A simple worker function.
    """
    print(f"Worker process {os.getpid()} started using '{multiprocessing.get_start_method()}' method.")
    time.sleep(0.5)
    print(f"Worker process {os.getpid()} finished.")

if __name__ == "__main__":
    # Get the default start method for the current OS
    default_method = multiprocessing.get_start_method(allow_none=False)
    print(f"Default start method: {default_method}")

    # You can set the start method before creating any processes
    # 'spawn' is safer on Windows/macOS but slower. 'fork' (default on Unix) is faster.
    # 'forkserver' is another option.
    try:
        multiprocessing.set_start_method('spawn', force=True)
        print(f"Start method set to: {multiprocessing.get_start_method()}")
    except RuntimeError as e:
        print(f"Could not set start method to 'spawn': {e}")
        print("Continuing with default method.")

    # Create and start a process using the (potentially new) start method
    p = multiprocessing.Process(target=process_worker)
    p.start()
    p.join()

    print("Process started and joined using the configured method.")

    # Resetting the start method (optional)
    # multiprocessing.set_start_method(default_method, force=True)

Explanation: Python's multiprocessing module offers different start methods for creating new processes: 'fork', 'spawn', and 'forkserver'. The default method varies by operating system ('fork' on Unix-like, 'spawn' on Windows/macOS). 'spawn' is generally safer as it creates a fresh interpreter process, avoiding issues with inherited resources, though it can be slower. This example shows how to explicitly set the start method using multiprocessing.set_start_method(). This is an advanced process configuration technique important for cross-platform compatibility and avoiding subtle bugs in complex multiprocessing applications.

 

Example 5: Using Process for Asynchronous Task Submission (without Pool)

import multiprocessing
import os
import time
import queue # For advanced result collection if not using Pool directly

def async_task_worker(task_id, result_queue):
    """
    A worker function that performs a task and puts its result into a Queue.
    """
    print(f"Task {task_id} (Process {os.getpid()}) starting...")
    time.sleep(1) # Simulate work
    result = f"Result from Task {task_id}"
    result_queue.put((task_id, result)) # Put result into the queue
    print(f"Task {task_id} finished.")

if __name__ == "__main__":
    # A Queue is a good way to collect results from multiple processes
    results_queue = multiprocessing.Queue()
    processes = []
    num_tasks = 5

    print("Starting individual processes asynchronously...")
    for i in range(num_tasks):
        p = multiprocessing.Process(target=async_task_worker, args=(i, results_queue))
        processes.append(p)
        p.start() # Start without joining immediately

    # Main process can do other work while child processes run
    print("Main process doing other things...")
    time.sleep(0.5)

    # Wait for all processes to complete
    for p in processes:
        p.join()

    # Collect results from the queue
    print("\nCollecting results:")
    collected_results = {}
    while not results_queue.empty():
        task_id, result = results_queue.get()
        collected_results[task_id] = result
    print(f"Collected results: {collected_results}")

    print("All tasks and result collection completed.")

Explanation: While multiprocessing.Pool is great for mapping tasks, sometimes you need finer control over individual asynchronous processes. This example demonstrates how to start multiple processes without immediately waiting for them (.join()). Instead, we use a multiprocessing.Queue to collect results from the processes as they complete their work. This pattern is useful for asynchronous task processing where the main process needs to stay responsive while tasks run in the background. It showcases how to manage independent processes and gather their outputs effectively.


 

Inter-process communication (Queues, Pipes)

Inter-process communication (IPC) is vital when you have multiple processes running concurrently and need them to share data or synchronize their operations. Since each process has its own memory space, direct variable sharing isn't possible. Python's multiprocessing module provides powerful IPC mechanisms like Queues and Pipes to enable safe and efficient data exchange between independent processes. Understanding these tools is key to building complex, cooperative multiprocessing applications that truly harness the power of parallelism.

 

Example 1: Basic Inter-process Communication with Queue

import multiprocessing
import os
import time

def producer(q):
    """
    A process that produces data and puts it into a Queue.
    """
    print(f"Producer process {os.getpid()} starting...")
    for i in range(5):
        item = f"Item {i}"
        print(f"Producer putting: {item}")
        q.put(item) # Put data into the queue
        time.sleep(0.5)
    q.put(None) # Signal that there's no more data
    print(f"Producer process {os.getpid()} finished.")

def consumer(q):
    """
    A process that consumes data from a Queue.
    """
    print(f"Consumer process {os.getpid()} starting...")
    while True:
        item = q.get() # Get data from the queue
        if item is None:
            break # Exit loop if a None sentinel is received
        print(f"Consumer got: {item}")
        time.sleep(0.7)
    print(f"Consumer process {os.getpid()} finished.")

if __name__ == "__main__":
    # Create a multiprocessing Queue
    my_queue = multiprocessing.Queue()

    # Create producer and consumer processes
    p = multiprocessing.Process(target=producer, args=(my_queue,))
    c = multiprocessing.Process(target=consumer, args=(my_queue,))

    # Start both processes
    p.start()
    c.start()

    # Wait for both processes to complete
    p.join()
    c.join()

    print("Producer and Consumer processes finished.")

Explanation: multiprocessing.Queue is a thread-safe and process-safe FIFO (First-In, First-Out) data structure ideal for sharing data between processes. In this producer-consumer example, the producer process puts items into the Queue using q.put(), and the consumer process retrieves them using q.get(). The None sentinel is a common pattern to signal the consumer that no more data will be produced. This is a fundamental way to achieve safe data exchange between Python processes.

 

Example 2: Using Pipes for Two-Way Communication

import multiprocessing
import os
import time

def child_process_pipe(conn):
    """
    A child process that communicates via a Pipe connection.
    """
    print(f"Child process {os.getpid()} starting with Pipe.")
    conn.send("Hello from child!") # Send a message to the parent
    time.sleep(1)
    received_message = conn.recv() # Receive a message from the parent
    print(f"Child process {os.getpid()} received: {received_message}")
    conn.close() # Close the connection when done
    print(f"Child process {os.getpid()} finished.")

if __name__ == "__main__":
    # Create a Pipe: returns two connection objects (parent_conn, child_conn)
    parent_conn, child_conn = multiprocessing.Pipe()

    # Create a process, passing one end of the pipe to it
    p = multiprocessing.Process(target=child_process_pipe, args=(child_conn,))

    # Start the child process
    p.start()

    # Parent process uses its end of the pipe
    print(f"Parent process {os.getpid()} starting with Pipe.")
    message_from_child = parent_conn.recv() # Receive message from child
    print(f"Parent process {os.getpid()} received: {message_from_child}")

    time.sleep(0.5)
    parent_conn.send("Hi from parent!") # Send message to child

    # Close the parent's end of the connection
    parent_conn.close()

    # Wait for the child process to complete
    p.join()

    print("Parent and Child processes finished Pipe communication.")

Explanation: multiprocessing.Pipe creates a duplex (two-way) communication channel between two processes. It returns two connection objects, each representing one end of the pipe. One end is typically passed to the child process, while the parent holds the other. conn.send() sends data, and conn.recv() receives it. Pipes are often preferred for two-way communication between exactly two processes, providing a direct and efficient channel. This demonstrates bidirectional IPC.

 

Example 3: Using Manager for Shared Data Structures (more complex IPC)

import multiprocessing
import os
import time

def modify_list_process(manager_list, process_id):
    """
    A process that modifies a shared list managed by a Manager.
    """
    print(f"Process {process_id} (ID: {os.getpid()}) starting to modify list.")
    for i in range(3):
        # Accessing and modifying the shared list
        manager_list.append(f"P{process_id}-Item{i}")
        print(f"Process {process_id} added: P{process_id}-Item{i}. Current list: {list(manager_list)}")
        time.sleep(0.2)
    print(f"Process {process_id} finished modifying list.")

if __name__ == "__main__":
    # Create a Manager object
    # Manager provides a way to create data that can be shared between different processes.
    with multiprocessing.Manager() as manager:
        # Create a shared list using the manager
        shared_list = manager.list() # This is a special list that can be shared

        processes = []
        for i in range(3):
            p = multiprocessing.Process(target=modify_list_process, args=(shared_list, i + 1))
            processes.append(p)
            p.start()

        for p in processes:
            p.join()

        print("\nAll processes finished modifying the shared list.")
        # The main process can now access the final state of the shared list
        print(f"Final shared list from main process: {list(shared_list)}")

Explanation: When you need to share more complex data structures like lists, dictionaries, or custom objects across multiple processes, multiprocessing.Manager comes to the rescue. A Manager provides a way to create shared objects that can be accessed and modified by different processes, with the manager handling the necessary synchronization. This example uses manager.list() to create a shared list. This is crucial for complex data sharing scenarios where Queues or Pipes might be too restrictive. It's a key tool for managing shared state across processes.

 

Example 4: Using Event for Process Synchronization

import multiprocessing
import time
import os

def worker_with_event(event, process_id):
    """
    A worker process that waits for an Event to be set.
    """
    print(f"Worker {process_id} (ID: {os.getpid()}) waiting for event...")
    event.wait() # Block until the event is set
    print(f"Worker {process_id} (ID: {os.getpid()}) Event received! Starting work.")
    time.sleep(1)
    print(f"Worker {process_id} (ID: {os.getpid()}) finished work.")

if __name__ == "__main__":
    # Create a multiprocessing Event
    event = multiprocessing.Event()

    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker_with_event, args=(event, i + 1))
        processes.append(p)
        p.start()

    print("Main process is doing other setup...")
    time.sleep(2) # Simulate some setup time

    print("Main process setting the event to release workers...")
    event.set() # Set the event, unblocking all waiting processes

    # Wait for all worker processes to complete
    for p in processes:
        p.join()

    print("All workers have completed their tasks after the event was set.")

Explanation: multiprocessing.Event is a simple yet powerful synchronization primitive used to coordinate the activities of multiple processes. One process can set() the event, and other processes can wait() for it. When the event is set, all processes currently waiting on it are unblocked. This example demonstrates how to synchronize the start of multiple worker processes, ensuring they only begin their main task after a specific condition (represented by the event being set) is met. This is fundamental for orchestrating concurrent operations.

 

Example 5: Combining Queue and Event for Controlled Task Processing

import multiprocessing
import os
import time

def task_processor(task_queue, result_queue, shutdown_event):
    """
    A worker process that processes tasks from a queue until a shutdown event is set.
    """
    print(f"Processor {os.getpid()} starting...")
    while not shutdown_event.is_set():
        try:
            task = task_queue.get(timeout=1) # Get task with a timeout
            print(f"Processor {os.getpid()} processing task: {task}")
            time.sleep(0.5) # Simulate task processing
            result_queue.put(f"Processed: {task}")
        except Exception: # Catch Empty exception from timeout
            if shutdown_event.is_set():
                break # Exit if event is set and queue is empty or timed out
    print(f"Processor {os.getpid()} shutting down.")

if __name__ == "__main__":
    tasks = ["Task A", "Task B", "Task C", "Task D", "Task E"]
    num_processors = 2

    task_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    shutdown_event = multiprocessing.Event() # Event to signal graceful shutdown

    # Populate the task queue
    for task in tasks:
        task_queue.put(task)

    processors = []
    for i in range(num_processors):
        p = multiprocessing.Process(
            target=task_processor,
            args=(task_queue, result_queue, shutdown_event)
        )
        processors.append(p)
        p.start()

    # Main process waits for tasks to be processed and collects results
    processed_count = 0
    while processed_count < len(tasks):
        try:
            result = result_queue.get(timeout=2)
            print(f"Main process received result: {result}")
            processed_count += 1
        except Exception:
            # If no results for a while, check if workers are still alive or tasks left
            pass

    print("\nAll tasks are processed. Signaling processors to shut down.")
    shutdown_event.set() # Set the event to signal workers to exit

    # Wait for all processor processes to gracefully shut down
    for p in processors:
        p.join()

    print("All processors have shut down. Main process exiting.")

Explanation: This advanced example showcases a robust task processing pattern combining Queues for task distribution and result collection with an Event for graceful process shutdown. Worker processes continuously get tasks fromtask_queue, process them, and put results intoresult_queue. Theshutdown_eventacts as a signal for workers to gracefully exit their loops when the main process is done or needs to terminate them. This is a powerful and flexible approach forbuilding scalable and controllable multi-process task systems`, ensuring both efficient processing and proper termination.