Child Workflows
While workflows in Hatchet implement a DAG, there are many cases where the structure of a workflow isn't known ahead of time. For example, you may have a batch processing workflow where the number of tasks is determined by the input - for example, running a workflow per page in a PDF. In these cases, you can use child workflows to dynamically create new workflows as needed.
Note that spawning child workflows is a durable operation, meaning that spawning a child workflow will persist the state of the parent workflow. This means that if the parent workflow is interrupted, it will pick up the child workflow in the exact same state when it resumes. The index of the child workflow will be used as the default key, but custom keys can be specified if the order of the child workflows can vary.
Looping Workflows
To loop through a list and create a new workflow per element in the list, you can simply use spawnWorkflow
in a loop. For example:
@hatchet.workflow("fanout:create")
class Parent:
@hatchet.step()
def spawn(self, context: Context):
for i in range(10):
context.spawn_workflow(
"Child", {"a": str(i)}, key=f"child{i}"
)
return {}
Note that calling spawnWorkflow
will return immediately, and the child workflows will run in parallel. If you want to wait for child workflows, you can await the results of each workflow (see below).
Spawning Workflows in Bulk
If you have a large number of workflows to spawn, you can use the spawn workflows
method to spawn multiple workflows in parallel. This method will return a list of Workflow Reference
objects that you can use to wait for the results of each workflow.
The advantage of doing this over spawning workflows in a sequentially in a loop is that the workflows will be spawned in parallel with one request to Hatchet instead of one request per workflow.
There is a maximum limit of 1000 workflows that can be spawned in a single request.
@hatchet.workflow("fanout:create")
class Parent:
@hatchet.step()
def spawn(self, context: Context):
workflow_requests = [
{
"workflow": "Child",
"input": {"a": str(i)},
"key": f"child{i}",
"options": {}
}
for i in range(10)
]
# Spawn the workflows in bulk using spawn_workflows
context.spawn_workflows(workflow_requests)
return {}
Scatter/Gather Workflows
To run all child workflows in parallel, and then wait for all of them to complete, you can use the result
method on the returned method.
@hatchet.workflow()
class Parent:
@hatchet.step(timeout="5m")
async def spawn(self, context: Context):
results = []
for i in range(10):
results.append(
(
await context.aio.spawn_workflow(
"Child", {"a": str(i)}, key=f"child{i}"
)
).result()
)
result = await asyncio.gather(*results)
return {"results": result}
@hatchet.workflow()
class Child:
@hatchet.step()
async def process(self, context: Context):
a = context.workflow_input()["a"]
return {"status": "success " + a}
Error Handling
If child workflows fail, an error will be raised to the parent, which can be caught and handled as needed. For example, to spawn a recovery workflow if a child workflow fails:
@hatchet.workflow("fanout:create")
class Parent:
@hatchet.step()
async def spawn(self, context: Context):
try:
(
await context.aio.spawn_workflow(
"Child", {"a": str(i)}, key=f"child{i}"
)
).result()
except Exception as e:
# Spawn a recovery workflow
context.spawn_workflow("recovery-workflow", { "error": str(e) });
return {}