Skip to content Skip to sidebar Skip to footer

Python Multiprocessing: How To Limit The Number Of Waiting Processes?

When running a large number of tasks (with large parameters) using Pool.apply_async, the processes are allocated and go to a waiting state, and there is no limit for the number of

Solution 1:

multiprocessing.Pool has a _taskqueue member of type multiprocessing.Queue, which takes an optional maxsize parameter; unfortunately it constructs it without the maxsize parameter set.

I'd recommend subclassing multiprocessing.Pool with a copy-paste of multiprocessing.Pool.__init__ that passes maxsize to _taskqueue constructor.

Monkey-patching the object (either the pool or the queue) would also work, but you'd have to monkeypatch pool._taskqueue._maxsize and pool._taskqueue._sem so it would be quite brittle:

pool._taskqueue._maxsize = maxsize
pool._taskqueue._sem = BoundedSemaphore(maxsize)

Solution 2:

Wait if pool._taskqueue is over the desired size:

import multiprocessing
import time

import numpy as np


def f(a,b):
    return np.linalg.solve(a,b)

def test(max_apply_size=100):
    p = multiprocessing.Pool()
    for _ in range(1000):
        p.apply_async(f, (np.random.rand(1000,1000),np.random.rand(1000)))

        while p._taskqueue.qsize() > max_apply_size:
            time.sleep(1)

    p.close()
    p.join()

if __name__ == '__main__':
    test()

Solution 3:

Here is a monkey patching alternative to the top answer:

import queue
from multiprocessing.pool import ThreadPool as Pool


classPatchedQueue():
  """
  Wrap stdlib queue and return a Queue(maxsize=...)
  when queue.SimpleQueue is accessed
  """def__init__(self, simple_queue_max_size=5000):
    self.simple_max = simple_queue_max_size  

  def__getattr__(self, attr):
    if attr == "SimpleQueue":
      returnlambda: queue.Queue(maxsize=self.simple_max)
    returngetattr(queue, attr)


classBoundedPool(Pool):
  # Override queue in this scope to use the patcher above
  queue = PatchedQueue()

pool = BoundedPool()
pool.apply_async(print, ("something",))

This is working as expected for Python 3.8 where multiprocessing Pool is using queue.SimpleQueue to setup the task queue. It sounds like the implementation for multiprocessing.Pool may have changed since 2.7

Solution 4:

You could add explicit Queue with maxsize parameter and use queue.put() instead of pool.apply_async() in this case. Then worker processes could:

for a, b in iter(queue.get, sentinel):
    # process it

If you want to limit the number of created input arguments/results that are in memory to approximately the number of active worker processes then you could use pool.imap*() methods:

#!/usr/bin/env python
import multiprocessing
import numpy as np

def f(a_b):
    return np.linalg.solve(*a_b)

def main():
    args = ((np.random.rand(1000,1000), np.random.rand(1000))
            for _ in range(1000))
    p = multiprocessing.Pool()
    for result in p.imap_unordered(f, args, chunksize=1):
        pass
    p.close()
    p.join()

if __name__ == '__main__':
    main()

Post a Comment for "Python Multiprocessing: How To Limit The Number Of Waiting Processes?"