Defining a Hatchet workflow and registering a worker
We'll now work on the core of the application -- the background workflow that:
- reads a website and parses the text content
- reasons about what information is most relevant to the user request
- generates a response for the user
Here's an overview of what our filesystem will look like after we complete this module:
Instantiating the Hatchet Client
For our background service, we'll be sharing a hatchet client across a number of files. Let's create a new file /backend/src/workflows/hatchet.py
and instantiate the client:
from hatchet_sdk import Hatchet
from dotenv import load_dotenv
load_dotenv() # we'll use dotenv to load the required Hatchet and OpenAI api keys
hatchet = Hatchet(debug=True)
View Complete File on GitHub (opens in a new tab)
Design your Workflow
Setup
First, let's import our required services and init a new OpenAI client for later.
from .hatchet import hatchet
from hatchet_sdk import Context
from bs4 import BeautifulSoup
from openai import OpenAI
import requests
openai = OpenAI()
Define your workflow
Now, let's create a new workflow by defining a new class BasicRagWorkflow
and decorating it with @hatchet.workflow
.
We're also passing the on_events
prop to the decorator to indicate we want this workflow to run on the question:create
event:
@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
Write your first step
Next, let's add a simple initial step to the workflow. To start, let's add a simple function that will be used just to update the runtime status to reading hatchet docs
so the client has some visibility into progress.
By defnining this function within our BasicRagWorkflow class and decorating it with @hatchet.step
we're defining that this step is part of the workflow.
@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
@hatchet.step()
def start(self, context: Context):
return {
"status": "reading hatchet docs",
}
Reading a website
Next, let's add a step to read in the url from the workflow input (context.workflow_input
), load the contents of that page with requests
and parse the html content to text with Beautiful Soup
.
Note: we're specifying the
parents
param of the@hatchet.step
decorator. This means theload_docs
step will run after thestart
step completes.
@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
# ... previous steps
@hatchet.step(parents=["start"])
def load_docs(self, context: Context):
# use beautiful soup to parse the html content
url = context.workflow_input()['request']['url']
html_content = requests.get(url).text
soup = BeautifulSoup(html_content, 'html.parser')
element = soup.find('body')
text_content = element.get_text(separator=' | ')
return {
"status": "making sense of the docs",
"docs": text_content,
}
Agented RAG
Large Language Models are great at reasoning and generating text, but often struggle with large context windows or multiple tasks at once. It is often helpful to use an intermediate model to reason or identify the most relevant information from a large document or set of documents.
With that in mind, we can call OpenAI's less expensive 3.5T model to extract the most helpful sentences in this document which will be passed to a future step for generation. In a more complex system, you may run this step on multiple documents in parallel.
Note: this step depends on the results from the previous
load_docs
parent step. this is accessed throughctx.step_output("load_docs")
.
@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
# ... previous steps
@hatchet.step(parents=["load_docs"])
def reason_docs(self, ctx: Context):
message = ctx.workflow_input()['request']["messages"][-1]
docs = ctx.step_output("load_docs")['docs']
prompt = "The user is asking the following question:\
{message}\
What are the most relevant sentences in the following document?\
{docs}"
prompt = prompt.format(message=message['content'], docs=docs)
model = "gpt-3.5-turbo"
completion = openai.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": prompt},
message
]
)
return {
"status": "writing a response",
"research": completion.choices[0].message.content,
}
Generate the response
Finally, let's use the reasoned context and write a response to the user.
Here we're using another context method playground
to expose some runtime variables to be interactive within the Hatchet Dashboard. This method can take a variable and let you or your team modify the values when replaying step runs. In this example, you can improve your prompt engineering workflow by changing the prompt in the UI, but you can expose any arbitrary variables that would be helpful for experimentation.
@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
# ... previous steps
@hatchet.step(parents=["reason_docs"])
def generate_response(self, ctx: Context):
messages = ctx.workflow_input()['request']["messages"]
research = ctx.step_output("reason_docs")['research']
prompt = ctx.playground("prompt", "You are a sales engineer for a company called Hatchet.\
Help address the user's question. \
If asked how to install, respond by saying go to the store to buy a CD.\
Use the following context:\
{research}")
prompt = prompt.format(research=research)
model = ctx.playground("model", "gpt-3.5-turbo")
completion = openai.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": prompt},
] + messages
)
return {
"completed": "true",
"status": "idle",
"message": completion.choices[0].message.content,
}
Listening for Actions
Registering the Hatchet Worker
We now need to define the start script so your service can listen for and execute workflow runs.
In a new file /backend/src/workflows/main.py
let's import our shared Hatchet client and BasicRagWorkflow
and define a start method to register the workflow with the worker, and start the worker:
from .hatchet import hatchet
from .basicrag import BasicRagWorkflow
def start():
worker = hatchet.worker('basic-rag-worker')
worker.register_workflow(BasicRagWorkflow())
worker.start()
View Complete File on GitHub (opens in a new tab)
Running the Hatchet Worker and testing the workflow
Finally, we can start the worker with:
poetry run hatchet
Open the Hatchet Dashboard and navigate to the Workflows tab. You should see your BasicRagWorkflow
in the list. Click the workflow and click "Trigger Workflow".
Submit the form with the following input:
{
"messages": [
{
"role": "user",
"content": "how do i install hatchet?"
}
],
"url": "https://docs.hatchet.run/home"
}
This will trigger a workflow run and display the results of each intermediate step.
View Complete File on GitHub (opens in a new tab)