We use cookies

We use cookies to ensure you get the best experience on our website. For more information on how we use cookies, please see our cookie policy.

By clicking "Accept", you agree to our use of cookies.
Learn more.

CookbooksPDF Pipeline

How to Build a PDF Processing Pipeline with Hatchet DAGs

The Durable Tasks vs DAG Workflows cookbook explains why document processing pipelines are a natural fit for DAG workflows. The steps are known in advance, the dependencies between them are explicit, and independent stages can run in parallel. This cookbook builds a concrete PDF pipeline with the following workflow:

Notice that after text extraction, the classify, summarize, and keyword extraction tasks can run concurrently because they each depend only on extract_text and not on each other. DAGs make concurrency explicit and simple to reason about. The final format step waits for all three tasks to finish before combining their outputs. At the end of this cookbook, we show how to swap the extraction task to use Reducto for production document parsing.

Setup

Prepare your environment

To run this example, you will need:

  • a working local Hatchet environment or access to Hatchet Cloud
  • a Hatchet SDK example environment (see the Quickstart)
  • optionally, a PDF text extraction library for real PDF parsing: pypdf for Python or pdf2json v4 for TypeScript. If the library is not installed, the example falls back to deterministic sample text so the DAG still runs.

Define the models

Define the workflow input and the output types for each task.

Define the DAG

Declare the workflow. All tasks will be registered on this workflow with explicit parent dependencies.

Extract text from the PDF

The first task decodes the base64 input and extracts text using a PDF library. This is the only task that touches the PDF directly; after it finishes, the rest of the pipeline works with plain text. If you later want to swap in OCR, object storage, Reducto, or a different PDF parser, this is the task you would change.

Classify the document

Now that we can read the text, we can process it in different ways independently. Let’s begin by classifying the document via keyword matching.

Summarize the text

Independently, we can summarize the text read from extract_text. To keep this example simple, our summarizer captures the first 50 words and includes the total word count in its output.

Extract keywords

The extract_keywords task also runs independently, pulling the most frequent meaningful words from the extracted text using a small stopword list and frequency counting. In fact, there is no interdependency between any of these text processing tasks, so Hatchet can run all three in parallel. Concurrency is not the only benefit of this structure. Since each parallel task in a DAG is an independent unit, task-level behavior can be configured independently for aspects such as retry policies, timeouts, and concurrency limits, among others. If keyword extraction is the slowest or most failure-prone step, its retry policy can be configured independently so that Hatchet retries it without re-running the classify or summarize tasks. In a sequential pipeline, you would need to build that isolation yourself. In a DAG, it comes from the structure.

Format the result

Hatchet runs format_result after classify_document, summarize_text, and extract_keywords have all completed. The final task receives the outputs from those parent tasks and combines them into one result.

Register and start the worker

Register the DAG workflow on a Hatchet worker and start it.

Run the pipeline

The trigger script creates a small sample PDF, base64-encodes it, and runs the workflow.

Test it

This example includes an end-to-end test that generates a tiny PDF, runs the full pipeline, and asserts on the outputs. If you run it against a local Hatchet instance or Hatchet Cloud, you can inspect the workflow run in the Hatchet dashboard.

pytest examples/pdf_pipeline/test_pdf_pipeline.py

Why this is a DAG

This pipeline is a good fit for a DAG because the processing stages are known before execution starts and the dependencies are easy to declare up front. The workflow does not need a loop, an unknown number of iterations, or a runtime decision about which tasks exist. Once extract_text completes, classify_document, summarize_text, and extract_keywords can run independently, and format_result waits for all three outputs before combining them. Each task points forward to later tasks, and no child step feeds back into an ancestor. If you later wanted to process a runtime-determined set of PDFs, a durable task could handle that outer decision and spawn this DAG once per document.

Swapping in Reducto for PDF extraction

The local example above runs without external services, but production document pipelines often need more than basic text extraction. Reducto’s Parse API can run OCR, detect document layout, and return structured chunks for content such as tables, figures, scanned pages, and multi-column documents. Since Hatchet’s DAG isolates PDF parsing in a single task, swapping in Reducto is as simple as replacing extract_text; the downstream tasks stay the same. Hatchet continues to run the workflow around the task, including retries, concurrency, and observability, while Reducto handles the document parsing itself.

Install the Reducto SDK and set REDUCTO_API_KEY in your environment.

The examples in this cookbook generate a tiny base64-encoded PDF and pass it to the workflow as input data to keep the demo self-contained. For larger PDFs, store the file in object storage such as S3 and pass a file key, URL, or Reducto file reference instead. This avoids putting large PDF payloads into workflow inputs, which can run into Hatchet’s gRPC payload size limit.

For small documents, the snippets below handle the inline result returned when result.result.type is "full". For larger documents, Reducto may return a URL result instead; production code should check that field and fetch the result from the URL if needed.

import base64
import tempfile
from pathlib import Path
from reducto import Reducto
 
@pdf_pipeline.task()
def extract_text(input: PdfInput, ctx: Context) -> ExtractOutput:
    client = Reducto()  # reads REDUCTO_API_KEY from environment
 
    decoded = base64.b64decode(input.content_base64)
    with tempfile.TemporaryDirectory() as tmp_dir:
        tmp_path = Path(tmp_dir) / "input.pdf"
        tmp_path.write_bytes(decoded)
        upload = client.upload(file=tmp_path)
 
    result = client.parse.run(input=upload.file_id)
    text = "\n".join(chunk.content for chunk in result.result.chunks)
 
    return ExtractOutput(text=text, page_count=result.usage.num_pages)

Next steps

From here you could add more processing stages, such as language detection, entity extraction, or metadata enrichment. You could also use Reducto for richer production parsing, replace keyword extraction or classification with an LLM call, configure retries or concurrency limits for the slowest stages, or fan out to process multiple PDFs by spawning the DAG as a child workflow from a durable task. For this cookbook, the five-task DAG is enough to show the core pattern.