Appearance
Distributed Processes
In the context of Thread and Process, it is recommended to prefer Process, as Processes are more stable and can be distributed across multiple machines, whereas Threads can only be distributed among multiple CPUs on the same machine.
The Python multiprocessing
module not only supports multi-process operations but also allows the managers
submodule to distribute processes across multiple machines. A service process can act as a scheduler, distributing tasks to other processes, relying on network communication. The managers
module is well-encapsulated, so you don't need to understand the details of network communication to easily write distributed multi-process programs.
For example, if we already have a multi-process program running on the same machine using Queue communication, and we want to distribute the task-sending process and the task-processing process across two machines due to heavy processing tasks. How can we implement this with distributed processes?
The existing Queue can still be used, but by exposing the Queue over the network using the managers
module, other machines' processes can access the Queue.
First, let’s look at the service process, which is responsible for starting the Queue, registering it to the network, and then writing tasks into the Queue:
python
# task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# Task sending queue:
task_queue = queue.Queue()
# Result receiving queue:
result_queue = queue.Queue()
# QueueManager inheriting from BaseManager:
class QueueManager(BaseManager):
pass
# Register both Queues to the network, callable parameter associates with the Queue object:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# Bind to port 5000 and set authentication key 'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# Start the Queue:
manager.start()
# Obtain the Queue objects accessible via network:
task = manager.get_task_queue()
result = manager.get_result_queue()
# Add several tasks:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# Retrieve results from the result queue:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# Shutdown:
manager.shutdown()
print('master exit.')
Note that when writing a multi-process program on a single machine, the created Queue can be used directly. However, in a distributed multi-process environment, tasks cannot be added directly to the original task_queue
, as that would bypass the encapsulation of the QueueManager. Tasks must be added through the Queue interface obtained via manager.get_task_queue()
.
Next, we will start the task process on another machine (it can also be started on the local machine):
python
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# Create a similar QueueManager:
class QueueManager(BaseManager):
pass
# Since this QueueManager only retrieves Queues from the network, only names are provided during registration:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# Connect to the server, which is the machine running task_master.py:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# Ensure the port and authentication key match exactly with those set in task_master.py:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# Connect from the network:
m.connect()
# Get the Queue objects:
task = m.get_task_queue()
result = m.get_result_queue()
# Fetch tasks from the task queue and write results to the result queue:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('task queue is empty.')
# End processing:
print('worker exit.')
The task process must connect to the service process over the network, so the IP of the service process must be specified.
Now, we can test the effectiveness of distributed processes. First, start the task_master.py
service process:
bash
$ python3 task_master.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...
After the task_master.py
process sends the tasks, it begins to wait for results from the result queue. Now, start the task_worker.py
process:
bash
$ python3 task_worker.py
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.
As the task_worker.py
process concludes, the task_master.py
process continues to print the results:
Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956
What is the use of this simple Master/Worker model? In fact, this represents a simple but real distributed computation. With slight modifications to the code, by launching multiple workers, tasks can be distributed across several or even dozens of machines. For instance, changing the code for calculating n*n
to sending emails would implement asynchronous sending of an email queue.
Where is the Queue object stored? Note that there is no Queue creation code in task_worker.py
, so the Queue object is stored in the task_master.py
process:
│
┌─────────────────────────────────────────┐ ┌──────────────────────────────────────┐
│task_master.py │ │ │task_worker.py │
│ │ │ │
│ task = manager.get_task_queue() │ │ │ task = manager.get_task_queue() │
│ result = manager.get_result_queue() │ │ result = manager.get_result_queue() │
│ │ │ │ │ │ │
│ │ │ │ │ │
│ ▼ │ │ │ │ │
│ ┌─────────────────────────────────┐ │ │ │ │
│ │QueueManager │ │ │ │ │ │
│ │ ┌────────────┐ ┌──────────────┐ │ │ │ │ │
│ │ │ task_queue │ │ result_queue │ │◀───┼──┼──┼──────────────┘ │
│ │ └────────────┘ └──────────────┘ │ │ │ │
│ └─────────────────────────────────┘ │ │ │ │
└─────────────────────────────────────────┘ └──────────────────────────────────────┘
The Queue is accessible over the network through the QueueManager. Since the QueueManager manages more than one Queue, each Queue’s network calling interface needs a name, such as get_task_queue
.
What is the purpose of the authkey
? This ensures that the two machines can communicate normally without being maliciously interfered with by other machines. If the authkey
in task_worker.py
does not match the authkey
in task_master.py
, the connection will certainly fail.
Summary
Python's distributed process interface is simple and well-encapsulated, suitable for environments where heavy tasks need to be distributed across multiple machines.
Note that the purpose of the Queue is to transmit tasks and receive results, and the amount of data describing each task should be minimized. For instance, when sending a task to process a log file, do not send the log file itself, which may be several hundred megabytes, but rather send the complete path where the log file is stored, allowing the Worker process to read the file from the shared disk.