Workflows in Hatchet
Now that you understand the foundational Step concept, you can combine these foundational building blocks into a structured sequence of tasks, known as a workflow.
Workflows can be as simple as a single step or as complex as a multi-step process with branching logic and parallel execution. Hatchet offers a declarative-first approach, providing a clear, organized blueprint for task execution.
Declarative Workflow Design (DAGs)
Hatchet workflows are designed in a Directed Acyclic Graph (DAG) format, where each step is a node in the graph, and the dependencies between steps are the edges. This structure ensures that workflows are organized, predictable, and free from circular dependencies. By defining the sequence and dependencies of steps upfront, you can easily understand the actual runtime state as compared to the expected state when debugging or troubleshooting.
Example: A Simple DAG Workflow
Here's an example of a simple graphical DAG workflow in Hatchet, consisting of four steps:
Each Step is represented as a node in the graph, and the arrows between the nodes represent the dependencies between the steps (left to right). In this example, start
must complete before load_docs
can start, load_docs
must complete before reason_docs
can start, and so on.
These relationships are defined in code by specifying required parents in the parent
array of that step.
@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
@hatchet.step()
def start(self, context: Context):
return {
"status": "starting...",
}
@hatchet.step()
def load_docs(self, context: Context):
# Load the relevant documents
return {
"status": "docs loaded",
"docs": text_content,
}
@hatchet.step(parents=["load_docs"])
def reason_docs(self, ctx: Context):
docs = ctx.step_output("load_docs")['docs']
# Reason about the relevant docs
return {
"status": "writing a response",
"research": research,
}
@hatchet.step(parents=["reason_docs"])
def generate_response(self, ctx: Context):
docs = ctx.step_output("reason_docs")['research']
# Reason about the relevant docs
return {
"status": "complete",
"message": message,
}
Accessing Workflow State Within a Step
In Hatchet, each step within a workflow has access to parent step outputs. This capability allows you to pass data between steps, enabling you to build complex workflows that respond dynamically to the outcomes of previous steps.
@hatchet.step(parents=["previous_step"])
def my_step(context: Context) -> dict:
data = context.workflow_input()
previous_step_data = ctx.step_output("previous_step")
# Perform some operation
return output
Next Steps: Understanding Workers
With a solid grasp of Hatchet's workflow capabilities, the next concept to explore is workers. Understanding workers is the last step to deploying and managing workflows with Hatchet.