How To Pass Parameters Other Than Data Through Pool.imap() Function For Multiprocessing In Python?
Solution 1:
[This isn't a direct answer to the question, but a clearer follow up query than trying the below via the small comment box]
As a quick check, pass in an iterator counter to spec_trans and return it back out (as well as your result) - and push it into a separate list, transformedXseq or something - and then compare to your input sequence. i.e.
def spec_trans(d,wav_fam,threshold_val,thresh_type, iCount):
data=np.array(d,dtype=np.float64)
data_dec=decomposition(data,wav_fam)
data_t=thresholding(data_dec,threshold_val,thresh_type)
data_rec=reconstruction(data_t,wav_fam)
return data_rec, iCount
and then within main
jobs=[]
iJobs = 0
for dataBand in xmp:
jobs.append(p.apply_async(spec_trans,args=(dataBand,wav_fam,threshold_val,thresh_type, iJobs)))
iJobs = iJobs + 1
transformedX=[]
transformedXseq=[]
for jobBit in jobs:
res = jobBit.get()
transformedX.append(res[0])
transformedXseq.append(res[1])
... and check the list transformedXseq to see if you've gathered the jobs back up in the sequence you submitted them. It should match!
Solution 2:
Assuming wav_fam
, threshold_val
and thresh_type
do not vary from call to call, first arrange for these arguments to be the first arguments to worker function spec_trans
:
defspec_trans(wav_fam, threshold_val, thresh_type, d):
Now I don't see where in your pool-creation block you have defined xmp
, but presumably this is an iterable. You need to modify this code as follows:
from functools import partial
defcompute_chunksize(pool_size, iterable_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1return chunksize
if __name__ == '__main__':
X=tifffile.imread('data/Classification/university.tif')
#take paramaters
threshold_val=float(input("Enter the value for image thresholding: "))
print("The available wavelet functions:",pywt.wavelist())
wav_fam=input("Choose a wavelet function for transformation: ")
threshold_type=['hard','soft']
print("The available wavelet functions:",threshold_type)
thresh_type=input("Choose a type for threshholding technique: ")
start=time.time()
p = mp.Pool(4)
# first 3 arguments to spec_trans will be wav_fam, threshold_val and thresh_type
worker = partial(spec_trans, wav_fam, threshold_val, thresh_type)
suitable_chunksize = compute_chunksize(4, len(xmp))
transformedX = list(p.imap(worker, xmp, chunksize=suitable_chunksize))
end=time.time()
To obtain improved performance over using apply_async
, you must use a "suitable chunksize" value with imap
. Function compute_chunksize
can be used for computing such a value based on the size of your pool, which is 4, and the size of the iterable being passed to imap
, which would be len(xmp)
. If the size of xmp
is small enough such that the chunksize value computed is 1, I don't really see how imap
would be significantly more performant over apply_async
.
Of course, you might as well just use:
transformedX = p.map(worker, xmp)
And let the pool compute its own suitable chunksize. imap
has an advantage over map
when the iterable is very large and not already a list. For map
to compute a suitable chunksize it would first have to convert the iterable to a list just to get its length and this could be memory inefficient. But if you know the length (or approximate length) of the iterable, then by using imap you can explicitly set a chunksize without having to convert the iterable to a list. The other advantage of imap_unordered
over map
is that you can process the results for the individual tasks as they become available whereas with map
you only get results when all the submitted tasks are complete.
Update
If you want to catch possible exceptions thrown by individual tasks submitted to your worker function, then stick with using imap
, and use the following code to iterate the results returned by imap
:
#transformedX = list(p.imap(worker, xmp, chunksize=suitable_chunksize))
transformedX = []
results = p.imap(worker, xmp, chunksize=suitable_chunksize)
import traceback
whileTrue:
try:
result = next(results)
except StopIteration: # no more resultsbreakexcept Exception as e:
print('Exception occurred:', e)
traceback.print_exc() # print stacktraceelse:
transformedX.append(result)
Post a Comment for "How To Pass Parameters Other Than Data Through Pool.imap() Function For Multiprocessing In Python?"