Implementing Real-time Progress Streaming with Hatchet and FastAPI
Create an event stream generator
Now, let's create an event stream generator function in /backend/src/api/main.py
that subscribes to the Hatchet event stream for a specific workflow run and yields events:
def event_stream_generator(workflowRunId):
''' This helper function is a generator that yields events from the Hatchet event stream. '''
stream = hatchet.client.listener.stream(workflowRunId)
for event in stream:
''' you can filter and transform event data here that will be sent to the client'''
if event.type == "step_completed":
data = json.dumps({
"type": event.type,
"payload": event.payload,
"messageId": workflowRunId
})
yield "data: " + data + "\n\n"
In this step, we create an event stream generator function that subscribes to the Hatchet event stream for a given workflowRunId
. It yields events from the stream, allowing you to filter and transform the event data before sending it to the client. In this example, we filter the events to only include "step_completed" events. You can further filter for your needs, for example, you can specify which step results to stream or exclude certain data from the payload.
Create a streaming endpoint
Next, let's create a streaming GET
endpoint that the client connects to in order to receive real-time progress updates:
@app.get("/message/{messageId}")
async def stream(messageId: str):
''' in a normal application you might use the message id to look up a workflowRunId
for this simple case, we have no persistence and just use the message id as the workflowRunId
you might also consider looking up the workflowRunId in a database and returning the results
if the message has already been processed '''
workflowRunId = messageId
return StreamingResponse(event_stream_generator(workflowRunId), media_type='text/event-stream')
In this step, we create a streaming endpoint that uses the messageId
(which is the workflowRunId
) to subscribe to the event stream and returns a StreamingResponse
with the event data.
View Complete File on GitHub (opens in a new tab)