How To Make Twisted Defer Get Function Result?
Solution 1:
Pool.apply_async()
has a callback
arg that you can leverage to start the callback chain in the Deferred
.
The catch (which is absolutely crucial to remember) is that the Pool-callback-function will be executed in another thread!
Therefore, you must call reactor.callFromThread
when applying the result to the Deferred
so that the callback chain occurs in the same thread as the reactor
.
Failure to do this will result in callbacks being executed in a different thread that the reactor has no context into.
Here is a slightly modified example:
from functools import partial
from multiprocessing import Pool
import threading
import time
from twisted.internet import defer, reactor
def f(x):
time.sleep(5)
return x*x
def get_result(pool, i):
deferred = defer.Deferred() # create a Deferred that will eventually provide a result
_set_result = partial(set_result, deferred=deferred) # pass the Deferred to the apply_async callback
pool.apply_async(f, args=(i,), callback=_set_result) # execute in a separate process, supply callback fn
return deferred
def set_result(result, deferred):
"""
Set the result in the deferred
"""
print('Thread ID: %d, Setting result %d' % (threading.get_ident(), result))
reactor.callFromThread(deferred.callback, result) # execute the Deferred callback chain from the reactor thread
def display_result(result):
"""
Just display the result
"""
print('Thread ID: %d, Display %d' % (threading.get_ident(), result))
def kill_reactor(null):
print('Thread ID: %d, Stopping reactor' % threading.get_ident())
reactor.stop()
def main():
print('Thread ID: %d, Main' % threading.get_ident())
pool = Pool(processes=4)
d = get_result(pool, 3)
d.addCallback(display_result)
d.addCallback(kill_reactor)
reactor.run()
main()
#---------- OUTPUT ----------#
# Thread ID: 803872, Main
# Thread ID: 533632, Setting result 9
# Thread ID: 803872, Display 9
# Thread ID: 803872, Stopping reactor
I've printed the thread id so that you can see that set_result()
is indeed called in another thread (process?) and not in the main thread, ie reactor thread.
Ommiting reactor.callFromThread(deferred.callback, result)
in this example will cause the callbacks to executed in a thread that reactor.stop()
will not work and Twisted throws up vomit (tracebacks) everywhere!
Consider using at reactor.spawnProcess
as this will limit mistakes that you (or myself) would make otherwise.
And as always, if you can do whatever it is you're tyring to do, in a single thread and omit multiprocessing
or threading
, I'd suggest you do that instead.
Post a Comment for "How To Make Twisted Defer Get Function Result?"