Streaming in Hatchet
Hatchet offers real-time result streaming of data from a background worker allowing you to subscribe to workflow progress, relay individual step results, and send data from an inprogress step run. This feature enables you to provide real-time updates and progress to users as the workflow runs, enhancing the user experience and interactivity of your application.
How It Works
When a workflow is triggered, Hatchet generates events for each step of the workflow, including the step's status, output, and relevant metadata. These events are generated by the background workers executing the workflow steps. By subscribing to this event stream, you can capture these events in real-time and, optionally, forward them to the frontend client.
Here's a high-level overview of the real-time progress streaming process:
- The client triggers a workflow by sending a request to the backend API.
- The backend API initiates the workflow using the Hatchet SDK and obtains a unique
workflowRunId
. - The backend API returns the
workflowRunId
to the client as a reference. - The client establishes a connection to a dedicated endpoint on the backend API, passing the
workflowRunId
as a parameter. - The backend API subscribes to the Hatchet event stream for the specified
workflowRunId
. - As the background workers execute the workflow steps, Hatchet generates events for each step, which are captured by the backend API through the event stream.
- The backend API processes the events, extracts relevant information, and sends the data to the client in real-time using a streaming response.
- The client receives the streamed data and updates the user interface accordingly, providing real-time progress updates.
If the connection is lost (e.g., page reload or transient network failure), the client can reconnect to the same endpoint and resume receiving new real-time updates by re-establishing the stream at step 4.
Listeners
Listeners are used to subscribe to the event stream for a specific workflow run. They are asynchronous generators that yield events as they are received from the event stream. You can filter and transform the event data before sending it to the client. Listeners generate events for Workflow run events, Step run events, and Step stream events.
Here's an example of how to create a listener:
async def listen_for_files():
workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", {"test": "test"})
listener = hatchet.listener.stream(workflowRunId)
async for event in listener:
# Filter and transform event data here
data = json.dumps({
"type": event.type,
"messageId": workflowRunId
})
print("data: " + data + "\n\n")
workflowRunId, err := c.Admin().RunWorkflow("stream-event-workflow", &streamEventInput{
Index: 0,
})
if err != nil {
panic(err)
}
err = c.Subscribe().Stream(interruptCtx, workflowRunId, func(event client.StreamEvent) error {
fmt.Println(string(event.Message))
return nil
})
if err != nil {
panic(err)
}
Streaming from a Step Context
You can also stream events from a specific step context, enabling you to stream arbitrary events, progress, intermediate inference, or debugging information from a step.
@hatchet.step()
def step1(self, context: Context):
# Stream some data from the step context
context.put_stream('hello from step1')
# continue with the step run...
return {"step1": "results"}
Streaming Files
Hatchet supports streaming base64 encoded files as part of the event payload, allowing you to transfer small to medium-sized files (under 4 MB) between the backend and frontend without waiting for a step result. For large files, consider using a file storage service and streaming the file URLs instead.
To stream a file from a step context, encode the file data as base64 and stream it as a payload:
@hatchet.step()
def step1(self, context: Context):
# Get the directory of the current script
script_dir = os.path.dirname(os.path.abspath(__file__))
# Construct the path to the image file relative to the script's directory
image_path = os.path.join(script_dir, "image.jpeg")
# Load the image file
with open(image_path, "rb") as image_file:
image_data = image_file.read()
# Encode the image data as base64
base64_image = base64.b64encode(image_data).decode('utf-8')
# Stream the base64-encoded image data
context.put_stream(base64_image)
# continue with the step run...
return {"step1": "results"}
For the listener, decode the base64-encoded payload and write it to a file:
async def listen_for_files():
load_dotenv()
hatchet = new_client()
workflowRunId = hatchet.admin.run_workflow("ManualTriggerWorkflow", {"test": "test"})
listener = hatchet.listener.stream(workflowRunId)
# Get the directory of the current script
script_dir = os.path.dirname(os.path.abspath(__file__))
# Create the "out" directory if it doesn't exist
out_dir = os.path.join(script_dir, "out")
os.makedirs(out_dir, exist_ok=True)
async for event in listener:
if event.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:
# Decode the base64-encoded payload
decoded_payload = base64.b64decode(event.payload)
# Construct the path to the payload file in the "out" directory
payload_path = os.path.join(out_dir, "payload.jpg")
with open(payload_path, "wb") as f:
f.write(decoded_payload)
Streaming by Additional Metadata
Often it is helpful to stream from multiple workflows (i.e. child workflows spawned from a parent) to achieve this, you can specify an additional meta key-value pair before running a workflow that can then be used to subscribe to all events from workflows that have the same key-value pair.
Since additinoal metadata is propegated from parent to child workflows, this can be used to track all events from a specific workflow run.
Here's an example of how to create a listener:
# Generate a random stream key to use to track all
# stream events for this workflow run.
streamKey = "streamKey"
streamVal = f"sk-{random.randint(1, 100)}"
# Specify the stream key as additional metadata
# when running the workflow.
# This key gets propagated to all child workflows
# and can have an arbitrary property name.
workflowRun = hatchet.admin.run_workflow(
"Parent",
{"n": 2},
options={"additional_metadata": {streamKey: streamVal}},
)
# Stream all events for the additional meta key value
listener = hatchet.listener.stream_by_additional_metadata(streamKey, streamVal)
async for event in listener:
print(event.type, event.payload)
Consuming Streams on Frontend
To consume a stream from the backend, create a Streaming Response endpoint to "proxy" the stream from the Hatchet workflow run.
First, write a generator to filter and transform the event data before sending it to the client:
def event_stream_generator(workflowRunId):
''' This helper function is a generator that yields events from the Hatchet event stream. '''
stream = hatchet.client.listener.stream(workflowRunId)
for event in stream:
''' you can filter and transform event data here that will be sent to the client'''
if event.type == "step_completed":
data = json.dumps({
"type": event.type,
"payload": event.payload,
"messageId": workflowRunId
})
yield "data: " + data + "\n\n"
Next, create a streaming GET
endpoint that the client connects to in order to receive real-time progress updates:
@app.get("/message/{messageId}")
async def stream(messageId: str):
''' in a normal application you might use the message id to look up a workflowRunId
for this simple case, we have no persistence and just use the message id as the workflowRunId
you might also consider looking up the workflowRunId in a database and returning the results
if the message has already been processed '''
workflowRunId = messageId
return StreamingResponse(event_stream_generator(workflowRunId), media_type='text/event-stream')