Combining Chains, Groups And Chunks With Celery
Solution 1:
Using CeleryBeat for cron-like tasks is a good idea.
I would try to catch exceptions in your get_url_content
micro-tasks. Just return something else when you catch them. This way, you can evaluate (e.g. count, list, inspect) them in a summarize_task.
How to use chunks and chain chunks with another task:
Step 1: Convert the chunk to a group:
As described in http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks, .group()
transforms an object of type celery.canvas.chunks
into a group, which is a much more common type in Celery.
Step 2: Chain a group and a task
The "Blow your mind by combining" section in http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives mentions:
Chaining a group together with another task will automatically upgrade it to be a chord
Here is some code with the two tasks and how I usually call them:
@app.task
def solve_micro_task(arg: str) -> dict:
...
@app.task
def summarize(items: List[List[dict]]):
flat_list = [item for sublist in items for item in sublist]
for report in flat_list:
...
chunk_tasks = solve_micro_task.chunks(<your iterator, e.g., a list>), 10) # type: celery.canvas.chunks
summarize_task = summarize.s()
chain(chunk_tasks.group(), summarize_task)()
Post a Comment for "Combining Chains, Groups And Chunks With Celery"