We use cookies

We use cookies to ensure you get the best experience on our website. For more information on how we use cookies, please see our cookie policy.

By clicking "Accept", you agree to our use of cookies.
Learn more.

GuideFrom Celery to Hatchet

Migrating from Celery to Hatchet

Celery is a mature and widely used background task system. This guide assumes you have already decided to migrate a Python project from Celery to Hatchet and want to understand what code and configuration need to change. Each section starts with a common Celery pattern, then shows what replaces it in Hatchet and what to watch out for.

This guide covers common Celery patterns. It does not cover every Celery setting or attempt a full infrastructure migration. For a discussion of Celery’s operational trade-offs, see this blog post.

Migration pattern lookup

Use this table to find the Celery pattern you have in your project and jump to the section that shows the Hatchet replacement. The table is a lookup aid; the sections below explain the migration details, caveats, and code changes.

StepCeleryHatchet replacementMigration category
1celery[...] + broker dependencieshatchet-sdkDependency change
1Celery config / env varsHATCHET_CLIENT_TOKENOperational change
1Worker / Beat / Flower processesHatchet worker + engine/cloudOperational change
2Celery("app", broker=..., backend=...)Hatchet() + HATCHET_CLIENT_TOKENOperational change
3@app.task / @shared_task@hatchet.task()Small rewrite
3def my_task(arg1, arg2)def my_task(input: MyInput, ctx: Context)Small rewrite
4task.delay(...) / .apply_async(...).run(..., wait_for_result=False) / .aio_run(..., wait_for_result=False)Small rewrite
5celery -A app workerhatchet worker dev or worker scriptSmall rewrite
6autoretry_for / self.retry()retries + backoff_factor + NonRetryableExceptionSmall rewrite
6time_limit / soft_time_limitexecution_timeout / schedule_timeoutDirect API swap
7apply_async(countdown=...) / eta=...task.aio_schedule(run_at, input)Small rewrite
7beat_schedule + celery beat processon_crons=["..."] in task definitionSmall rewrite
8chain(a.s(), b.s())DAG workflow with parents=[a]Conceptual redesign
8group(a.s(), b.s())Parallel DAG tasks (no parents)Conceptual redesign
8chord(group, callback)DAG task with multiple parentsConceptual redesign
9Result backend + AsyncResult + FlowerHatchet run history + dashboardOperational change
3task_serializer (pickle/msgpack)JSON via PydanticSmall rewrite
10task_routes / queues / routingWorker registration + worker affinityConceptual redesign
10Celery signals (task_prerun, etc.)on_failure_task / on_success_task / ctx.log()Conceptual redesign
10revoke / task cancellationCancellation API + dashboard + ctx.is_cancelledSmall rewrite
10Task priorityPriority (1-3 levels)Operational change
10task_always_eager / testing.mock_run() / .aio_mock_run()Small rewrite

Step 1: Update dependencies and runtime configuration

Dependencies

Install hatchet-sdk alongside Celery using your chosen package manager. For example:

pip install hatchet-sdk

Celery and Hatchet can run side-by-side during a migration. This lets you move Celery tasks to Hatchet one at a time and validate behavior incrementally. Since Celery and Hatchet do not share a workflow runtime, each migrated task or workflow must have a clear boundary. Once a task or workflow is moved to Hatchet, update the application code that enqueues it to call Hatchet instead of Celery.

Configuration

Celery projects configure broker and result backend connections. Hatchet does not have equivalent broker or result-backend settings. Instead, configure the Hatchet SDK so it can connect to the Hatchet engine. For Hatchet Cloud, configure the SDK with an API token:

export HATCHET_CLIENT_TOKEN="your-token-here"

For self-hosted or local Hatchet deployments, you may need environment-specific client settings for the Hatchet engine endpoint and TLS configuration.

Celery’s default loader reads settings from a celeryconfig.py module on the Python path. Framework integrations may load the same settings from another source; for example, Django projects commonly load Celery settings from Django settings with a CELERY_ namespace. See the Celery configuration reference for the full list. If your project has an extensive Celery configuration, review each section of this guide to determine which settings need migration and which can be removed.

Process and deployment cleanup

Celery projects often run multiple long-lived processes, including workers, a task scheduler named Celery Beat, and optionally Flower for runtime monitoring. Depending on your environment, these may be managed through init scripts, systemd services, supervisor, launchd, Docker Compose, Kubernetes manifests, a Procfile, or CI/deployment scripts. Celery’s daemonization docs provide guidance on managing these processes and are a useful resource for understanding an existing deployment.

Replace Celery worker and Beat processes with Hatchet worker scripts. Hatchet cron runs are managed by the Hatchet engine, so there is no separate scheduler process to deploy. Hatchet’s dashboard replaces the basic Flower deployment path for runtime visibility.

Broker and result backend services are a separate cleanup step. Do not remove Redis, RabbitMQ, SQS, SQL databases, Cassandra, or other infrastructure just because Celery is being removed. Remove or decommission those services only after confirming they were used exclusively for Celery and are not still used by your application.

Step 2: Replace the Celery app with a Hatchet client

A Celery project usually has a shared Celery app instance that defines the broker, result backend, and task registry. In Hatchet, replace that shared app object with a shared Hatchet client.

Celery:

from celery import Celery
 
app = Celery("tasks", broker="redis://localhost:6379", backend="redis://localhost:6379")

Hatchet:

from hatchet_sdk import Hatchet
 
hatchet = Hatchet()

The Hatchet() client reads its SDK configuration from HATCHET_CLIENT_* environment variables by default, including HATCHET_CLIENT_TOKEN. Create a Hatchet instance in a shared module and import it wherever your code needs to interact with Hatchet.

For self-hosted or local deployments, you may need additional client configuration such as the Hatchet API endpoint, gRPC host, or TLS settings. These can be supplied through environment variables, and the SDK also supports explicit client configuration when needed.

What changed:

  • The shared app object changes. Replace the shared Celery app = Celery(...) instance with a shared hatchet = Hatchet() client.
  • Broker and backend settings move out of the app constructor. Hatchet does not take Celery-style broker= or backend= arguments.
  • Connection settings move to SDK configuration. Use HATCHET_CLIENT_* environment variables, or explicit client configuration when needed.
  • The same client is reused across the migration. Import the shared hatchet client where you define tasks, workflows, workers, and code that triggers runs.

Step 3: Convert task definitions

Celery tasks accept optional task-specific arguments. In Hatchet, task functions receive one input object and a context object instead of positional args.

Celery:

from celery import Celery
 
app = Celery("tasks", broker="redis://localhost:6379")
 
@app.task
def process_image(image_url: str, filters: list[str]) -> dict:
    result = resize(image_url, filters)
    return {"processed_url": result}

To migrate the Celery task, define an input model for image_url and filters, change the decorator to @hatchet.task(...), add ctx: Context, and return a serializable task output.

Hatchet:

What changed:

  • @app.task becomes @hatchet.task(...). Use input_validator=YourModel to validate and type the task input.
  • Positional arguments move into an input model. Replace (image_url, filters) with input: ImageInput, where ImageInput is a Pydantic BaseModel with those fields.
  • The task receives a context object. Add ctx: Context as the second argument for run metadata, retry information, parent task outputs, and logging.
  • The return value becomes the task output. Return a value Pydantic can serialize, such as a Pydantic model or a dict.
  • Tasks can be sync or async. Hatchet tasks can be def or async def; the SDK recommends async for I/O-bound work.
⚠️

If your Celery tasks use positional arguments, *args, or **kwargs, you will need to define a Pydantic model that captures the expected fields. For tasks that take no meaningful input, use EmptyModel from hatchet_sdk. This input-model conversion is the main mechanical cost of the migration.

Serialization

Celery supports serializers such as JSON, pickle, YAML, and msgpack through task_serializer. Hatchet task inputs and outputs are serialized through Pydantic. If your Celery project uses pickle or another non-JSON serializer, make sure your task payloads can be represented in Pydantic’s JSON serialization mode. Values that Pydantic cannot serialize directly should be converted to JSON-compatible fields or handled with custom Pydantic serializers.

Step 4: Invoke tasks with input models

In Celery, a function decorated with @app.task is invoked through Celery’s task calling API. Calls such as .delay(...) pass arguments to the underlying task function, while apply_async(...) passes them through args= and kwargs=. In Hatchet, invoke the task with the input model you defined in Step 3. The same task input model is used whether you wait for the result or enqueue the task and continue.

Celery:

# Fire-and-forget
process_image.delay("https://example.com/photo.png", ["thumbnail"])
 
# With options
process_image.apply_async(
    args=["https://example.com/photo.png", ["thumbnail"]],
)

To migrate these calls, replace the task arguments with ImageInput(...) and call .run() or .aio_run() on the Hatchet task. By default, Hatchet waits for the task to finish and returns the typed result directly.

Hatchet:

What changed:

  • Task arguments move into the input model. Replace task arguments such as ("https://example.com/photo.png", ["thumbnail"]) with ImageInput(image_url=..., filters=...).
  • Invocation methods change. Replace .delay() / .apply_async() with .run() for sync code or .aio_run() for async code.
  • The default call waits for the result. Celery’s .delay() enqueues work and returns an AsyncResult immediately. Hatchet’s .run() and .aio_run() wait until the task completes and return the typed result directly.
  • Fire-and-forget uses wait_for_result=False. To enqueue without waiting, pass wait_for_result=False. This returns a TaskRunRef with the run ID and .result() / .aio_result() methods you can call later.
  • Delayed execution uses a separate API. Replace Celery countdown or eta with scheduled runs, covered in Step 7.

Step 5: Run a Hatchet worker

Celery workers are typically started from the CLI or through a process manager:

Celery:

celery -A tasks worker --loglevel=info --concurrency=4

In Hatchet, the migration artifact is a Python worker script that explicitly registers the tasks and workflows it can execute:

Hatchet:

During development, start the worker with the Hatchet CLI, which handles authentication and hot reloads on code changes:

hatchet worker dev

In production, run the worker script directly with your HATCHET_CLIENT_TOKEN set:

python worker.py

What changed:

  • The worker is defined in code. Replace Celery’s app-based task discovery with a worker script that registers executable tasks and workflows in the worker’s workflows=[...] list.
  • Worker capacity moves to slots. Replace Celery’s --concurrency=4 flag with the slots parameter on the Hatchet worker.
  • Startup differs by environment. Use hatchet worker dev during local development, then run the worker script through your process manager or container runtime in production.
  • Celery pool settings do not migrate directly. Celery supports pool types such as prefork, eventlet, gevent, and threads. Hatchet Python workers use the SDK’s sync/async execution model with worker slots, so CPU-bound work may need threads, subprocesses, or separate workers.

Step 6: Migrate retries and timeouts

Retries

Celery supports automatic retries for specified exception types and manual retries inside the task body.

Celery:

# Automatic: retry only on RequestError, with exponential backoff
@app.task(bind=True, autoretry_for=(RequestError,),
          retry_backoff=True, max_retries=5, retry_backoff_max=60)
def call_api(self, order_id: str) -> dict:
    return external_api_call(order_id)
 
# Manual: explicit retry with custom logic
@app.task(bind=True, max_retries=3)
def call_api_manual(self, order_id: str) -> dict:
    try:
        return external_api_call(order_id)
    except RequestError as exc:
        raise self.retry(exc=exc, countdown=30)

To migrate automatic retries, move the retry policy onto the Hatchet task decorator. Hatchet retries task failures when retries are configured, so model retryable failures by raising normal exceptions.

Hatchet:

To prevent retries for known permanent failures, raise NonRetryableException from the task body:

from hatchet_sdk.exceptions import NonRetryableException
 
# Inside a task: skip retry for a permanent failure
if response.status_code == 400:
    raise NonRetryableException("Bad request: do not retry")

See retry policies for the full set of retry options and behavior.

What changed:

  • Retry policy moves onto the task decorator. Replace Celery retry options such as max_retries, retry_backoff, and retry_backoff_max with Hatchet’s retries, backoff_factor, and backoff_max_seconds.
  • Review retry exception rules. Celery’s autoretry_for=(...) lists the exceptions that should trigger retries. In Hatchet, task failures are retryable when retries > 0; raise NonRetryableException for failures that should not retry.
  • Manual self.retry() logic needs redesign. Hatchet does not provide a direct self.retry() equivalent inside the task body; task-level retry behavior is managed by the Hatchet engine.

Timeouts

Celery:

@app.task(time_limit=30, soft_time_limit=25)
def long_task():
    ...

Celery defaults to no task time limit unless you configure one. Hatchet uses an execution_timeout for how long a task may run and schedule_timeout for how long a task may wait in the queue before being cancelled. For migration purposes, Hatchet’s execution_timeout is the closest match for Celery’s time_limit. Celery’s soft_time_limit has no exact Hatchet equivalent. If your task depends on soft time limits for cleanup, move that cleanup into explicit task logic during migration.

Hatchet:

from datetime import timedelta
 
@hatchet.task(execution_timeout=timedelta(seconds=30))
async def long_task(input, ctx):
    ...

What changed:

  • Hard task limits move to execution_timeout. Replace Celery time_limit=30 with execution_timeout=timedelta(seconds=30).
  • Queue wait time is controlled separately. Hatchet’s schedule_timeout controls how long a task may wait in the queue before being cancelled.
  • Soft timeout cleanup must be rewritten. Celery’s soft_time_limit has no exact Hatchet equivalent, so cleanup that depends on soft-timeout exceptions should become explicit task logic.
  • Review timeout defaults. Celery task time limits are not enabled unless configured; Hatchet has default timeout behavior, so check the timeout docs before relying on implicit behavior.

Step 7: Migrate delayed and periodic tasks

Delayed execution

In Celery, delayed execution is configured at the call site with countdown or eta on apply_async:

# Run 5 minutes from now
process_image.apply_async(args=["https://example.com/photo.png", ["blur"]], countdown=300)
 
# Run at a specific time
from datetime import datetime, timedelta, timezone
process_image.apply_async(
    args=["https://example.com/photo.png", ["blur"]],
    eta=datetime.now(timezone.utc) + timedelta(hours=1),
)
⚠️

Celery’s documentation warns that ETA/countdown tasks remain in worker memory until their scheduled execution time, and recommends using short delays rather than scheduling far into the future.

When migrating those call sites to Hatchet, delayed execution moves to the scheduled run API. Scheduled runs are persisted by the Hatchet engine instead of being held in worker memory, which makes longer delays feasible:

What changed:

  • countdown / eta becomes scheduled runs. Replace apply_async(..., countdown=...) or apply_async(..., eta=...) with task.aio_schedule(run_at, input).
  • The task input shape stays the same. Scheduled runs use the same ImageInput(...) model as immediate .aio_run(...) calls.
  • The worker no longer holds the delay. Hatchet persists the scheduled run in the engine until it is ready to execute.

Periodic tasks

In Celery, recurring schedules are usually defined in beat_schedule and executed by a separate Celery Beat process:

# celeryconfig.py
from celery.schedules import crontab
 
beat_schedule = {
    "daily-report": {
        "task": "tasks.generate_report",
        "schedule": crontab(hour=9, minute=0),
    },
}

In production, Beat may be run through your process supervisor, container runtime, or directly from the CLI:

celery -A tasks beat  # must run exactly one instance

In Hatchet, cron triggers are declared on the task with the on_crons parameter, which accepts a list of cron expressions managed by the Hatchet engine. When migrating periodic tasks to Hatchet, convert Beat schedules that use Celery’s crontab(...) helper into on_crons entries:

What changed:

  • Celery Beat is removed. You do not run a separate celery beat process or ensure that only one Beat instance is active.
  • The schedule moves onto the task. Replace the beat_schedule entry with on_crons=["0 9 * * *"].
  • Celery crontab(...) becomes a cron expression. Use standard 5-field or 6-field cron syntax.
  • Non-crontab schedules need review. Interval, solar, or custom Beat schedules may not convert directly to a cron expression.
  • Runtime schedule management moves to the cron client. If your application creates, lists, or deletes schedules dynamically, use the hatchet.cron client.

Step 8: Migrate chains, groups, and chords

This is the biggest conceptual change in the migration. Celery Canvas builds tasks at the call site. In contrast, Hatchet DAG workflows define the dependency graph up front in the workflow definition. Understanding Hatchet’s workflow orchestration model will make the rest of this section easier to follow.

Celery chain to Hatchet DAG

Celery:

from celery import chain
 
pipeline = chain(
    validate.s(order_id),
    charge.s(),
    fulfill.s(),
    notify.s(),
)
pipeline.apply_async()

In a Celery chain, each task’s return value is passed as the first argument to the next task. To migrate a Celery chain, turn each task in the chain into a task in the same Hatchet workflow, then express the order of execution with parents. The first task has no parent, the second task depends on the first, and each later task depends on the task that came before it.

Hatchet:

What changed:

  • Dependencies are declared on the task, not at the call site. parents=[validate] means “run after validate finishes.”
  • Parent outputs are accessed explicitly via ctx.task_output(parent_task), not passed as positional arguments.
  • The workflow is triggered as a unit. Replace pipeline.apply_async() with await order_pipeline.aio_run(OrderInput(...)). You do not chain individual task calls.

Celery group to parallel DAG tasks

A Celery group runs multiple tasks in parallel. If the parallel tasks are known when you define the workflow, migrate them to Hatchet as DAG tasks with the same parent, or with no parents if they can start immediately.

Celery:

from celery import group
 
checks = group(
    check_inventory.s(order_id),
    check_fraud.s(order_id),
)
checks.apply_async()

To migrate this pattern, define both tasks in the same Hatchet workflow without making one depend on the other. If the Celery code also waits for and combines the group results, that is an additional aggregation step, so add a downstream convergence task with the parallel tasks as parents.

Hatchet:

What changed:

  • Parallelism moves into the workflow definition. Tasks with no dependency between them can run concurrently.
  • The call site starts the workflow, not a group(...). You trigger the workflow as a unit instead of constructing a Canvas group dynamically.
  • Aggregation may become a convergence task. If the Celery code waits for group results and combines them, add a downstream task with the parallel tasks as parents.
  • Runtime fan-out is a different pattern. If your Celery group is built from a runtime list, such as group(process.s(item) for item in items), migrate that separately using child spawning.

Dynamic Celery groups to child spawning

Some Celery groups are built from a list that is only known at runtime:

Celery:

from celery import group
 
items = get_items_for_order(order_id)
result = group(process_item.s(item_id) for item_id in items)()

This is different from a static group of known tasks. A Hatchet DAG is defined ahead of time, so it is not the right fit when the number of child tasks depends on runtime input. To migrate this pattern, use child spawning. The parent task receives the list, spawns one child run per item using aio_run_many, and collects the results:

What changed:

  • Build the fan-out inside a parent task. Instead of constructing a group(...) at the call site, the parent task computes the runtime list and calls aio_run_many.
  • Each item becomes a child run. The child task or workflow receives one item from the runtime list.
  • Collect results where you spawn the children. Replace GroupResult handling with the results returned to the parent task by aio_run_many.

Celery chord to DAG convergence

A Celery chord runs a group of tasks in parallel and then runs a callback after every task in the group completes. When the parallel tasks are known ahead of time, migrate this pattern to a Hatchet DAG convergence task with multiple parents.

Celery:

from celery import chord
 
result = chord(
    [fetch_a.s(order_id), fetch_b.s(order_id), fetch_c.s(order_id)]
)(aggregate.s())

The aggregate callback receives a list of results from the group. To migrate a static chord, make the aggregate task depend on each parallel task and read each parent output explicitly.

Hatchet:

@workflow.task(parents=[fetch_a, fetch_b, fetch_c])
async def aggregate(input, ctx):
    a = ctx.task_output(fetch_a)
    b = ctx.task_output(fetch_b)
    c = ctx.task_output(fetch_c)
    ...

What changed:

  • Fan-in is declared with multiple parents. A task with parents=[fetch_a, fetch_b, fetch_c] runs only after all listed parents complete.
  • Parent outputs are accessed individually. Instead of receiving a list of group results as a callback argument, the aggregate task calls ctx.task_output(parent) for each parent.
  • Runtime-sized chords need child spawning. If your Celery chord is built from a runtime-sized group, such as chord([fetch.s(url) for url in urls])(aggregate.s()), migrate it with child spawning and explicit result collection rather than a static DAG convergence task.

Step 9: Replace result backend and Flower monitoring

In Celery, result retrieval and task-state inspection depend on a configured result backend. If your application calls AsyncResult.get() or checks AsyncResult.state, migrate those patterns to Hatchet’s result-returning invocation methods, TaskRunRef, or the runs client.

Flower is a separate cleanup step. Since Hatchet records run status, logs, retry attempts, timing, and workflow relationships in the dashboard, Flower is not needed for basic runtime visibility after migration is complete.

Consider the following Celery snippet that enqueues a task, retrieves its result, stores a task ID for later lookup, and checks task state:

Celery:

# Wait for the result
async_result = process_image.delay(
    "https://example.com/photo.png",
    ["thumbnail"],
)
output = async_result.get(timeout=10)
print(output["processed_url"])
 
# Fire-and-forget, then retrieve later
async_result = process_image.apply_async(
    args=["https://example.com/photo.png", ["blur"]],
)
task_id = async_result.id
 
# Check task state later
state = async_result.state
print(state)  # PENDING, SUCCESS, FAILURE, RETRY, or REVOKED
 
# Retrieve the result when ready
output = async_result.get(timeout=10)
print(output["processed_url"])

If your application branches on Celery task states, review that logic during migration instead of renaming states one-for-one. For example, Celery only reports STARTED when track_started=True or task_track_started is configured. In Hatchet, logic that depends on STARTED usually maps to checking for a RUNNING run status.

When migrating result-handling code to Hatchet, first decide whether the caller needs the result immediately or only needs a run reference. Use .run() / .aio_run() when the caller should wait for the result. Use wait_for_result=False when the caller should enqueue the work and inspect or retrieve the run later.

Hatchet:

What changed:

  • Result retrieval is direct by default. Replace AsyncResult.get() with .run() or .aio_run() when the caller should wait for the task result.
  • Fire-and-forget returns a run reference. Use wait_for_result=False when the caller should enqueue work and continue. The returned TaskRunRef includes .workflow_run_id and .result() / .aio_result().
  • State inspection moves to the runs client. Replace AsyncResult.state checks with hatchet.runs.aio_get_status(workflow_run_id) when application code needs run status.
  • Flower becomes deployment cleanup. Once the migrated workloads no longer run through Celery, remove the Flower process and use Hatchet’s dashboard for run visibility.

Step 10: Migrate other Celery project surfaces

This section covers Celery features that often appear in production projects but do not have a one-to-one Hatchet equivalent. For each one, identify the Celery behavior your application depends on, then migrate it to the closest Hatchet pattern.

Queues and routing

Celery uses task_routes and named queues to control which workers handle which tasks:

Celery:

# celeryconfig.py
task_routes = {
    "tasks.process_image": {"queue": "image-processing"},
    "tasks.send_email": {"queue": "notifications"},
}
celery -A tasks worker -Q image-processing

In Hatchet, routing is handled by registering specific tasks on each worker. To migrate your Celery routes, remove the queue route for that task and register the migrated Hatchet task on the worker that should run it.

Hatchet:

What changed:

  • task_routes and queue-bound workers become worker registration. Replace Celery queue routing with Hatchet workers that register the tasks and workflows they can execute in workflows=[...].
  • Advanced assignment uses worker labels. If you need weighted routing or capability-based assignment, Hatchet supports worker affinity where tasks declare desired_worker_labels and workers advertise capabilities. This replaces exchange-based routing patterns.
  • This is a redesign, not a rename. Simple queue-per-worker routing becomes explicit worker registration. Complex routing rules should be evaluated against worker affinity during migration.

Signals, hooks, and progress updates

Celery provides signals for task lifecycle events. A common pattern is logging or alerting on failure. Celery also supports self.update_state(state="PROGRESS", meta={...}) for progress reporting within a running task.

Celery:

from celery.signals import task_failure
 
@task_failure.connect
def handle_task_failure(sender=None, task_id=None, exception=None, **kwargs):
    notify_ops_team(task_id, exception)

Migrate Celery signals and progress reporting to Hatchet workflow-level hooks and structured logging. If the signal handles workflow-level success or failure, migrate it to a workflow hook. If it reports progress, move that into the task with ctx.log() or streaming:

Hatchet:

What changed:

  • task_failure / task_success signals become workflow hooks. @workflow.on_failure_task() runs after any task in the workflow fails. @workflow.on_success_task() runs after all tasks succeed. These are tasks within the workflow, not global signal handlers.
  • Human-readable progress moves to logs. Use ctx.log() to send progress messages visible in the dashboard.
  • Live progress data moves to streams. For real-time progress data that application code consumes, use ctx.put_stream() to push data to subscribers.
  • Per-task task_prerun / task_postrun signals have no direct equivalent. If your project uses these for setup or teardown around individual tasks, consider dependency injection. Other cross-cutting behavior may require a small redesign.

Cancellation

Celery cancels tasks with revoke() on an AsyncResult:

Celery:

result = process_image.delay("https://example.com/photo.png", ["blur"])
result.revoke()                    # cancel if still pending
result.revoke(terminate=True)      # also terminate if running

Hatchet supports task cancellation through the runs client, the dashboard, or concurrency strategies like CANCEL_IN_PROGRESS. To cancel a run programmatically, use hatchet.runs.cancel(run_id). Running tasks should cooperate by checking ctx.is_cancelled:

Hatchet:

ref = await process_image.aio_run(
    ImageInput(image_url="https://example.com/photo.png", filters=["blur"]),
    wait_for_result=False,
)
 
# Cancel a run by ID
await hatchet.runs.aio_cancel(ref.workflow_run_id)
 
# Inside a task: cooperate with cancellation
@hatchet.task()
async def long_running(input, ctx):
    for batch in batches:
        if ctx.is_cancelled:
            return {"status": "cancelled"}
        await process_batch(batch)
    return {"status": "done"}

What changed:

  • result.revoke() becomes hatchet.runs.cancel(...) / hatchet.runs.aio_cancel(...). Celery cancels via the AsyncResult object. Hatchet cancels via the runs client or the dashboard using the workflow run ID.
  • Running tasks must cooperate. Celery’s revoke(terminate=True) sends a signal to the worker process. Hatchet sets a cancellation flag that the task checks with ctx.is_cancelled. Long-running tasks should check the flag so they can stop promptly and run any cleanup logic.
  • Concurrency strategies can cancel automatically. Hatchet’s CANCEL_IN_PROGRESS strategy cancels existing runs when a new run arrives for the same concurrency key. This has no Celery equivalent.

Priority

Celery priority is assigned when the task is enqueued, and the meaning of the numeric priority value depends on the broker.

Celery:

process_image.apply_async(
    args=["https://example.com/photo.png", ["blur"]],
    priority=9,  # broker-dependent scale
)

When migrating priority-sensitive task calls to Hatchet, replace broker-specific numeric priorities with Hatchet’s priority enum. Hatchet supports three priority levels: Priority.LOW, Priority.MEDIUM, and Priority.HIGH. You can set priority when triggering a run, or define a default priority on the workflow.

Hatchet:

from hatchet_sdk import Priority
 
ref = await process_image.aio_run(
    ImageInput(image_url="https://example.com/photo.png", filters=["blur"]),
    wait_for_result=False,
    priority=Priority.HIGH,
)

What changed:

  • Broker-specific numbers become Hatchet priority levels. Replace Celery numeric priorities with Priority.LOW, Priority.MEDIUM, or Priority.HIGH.
  • Priority can be set per run or as a workflow default. Use per-run priority when priority changes by call site; use a workflow default when all runs of that workflow should share the same priority.
  • Priority is scoped to a workflow type. Higher-priority runs of the same workflow are scheduled before lower-priority runs. Priority does not affect scheduling across different workflow types.
  • Priority-sensitive behavior needs validation. Celery broker priority and Hatchet scheduling priority are different models, so validate workflows that depend on precise ordering.

Testing migrated tasks

Celery projects sometimes use task_always_eager for testing, though Celery’s own documentation discourages it for unit tests:

Celery:

task_always_eager = True
 
result = process_image.delay("https://example.com/photo.png", ["thumbnail"])
assert result.get()["processed_url"] == "https://cdn.example.com/photo.png"

Hatchet tasks can be unit-tested without a running engine using .mock_run() (sync) or .aio_mock_run() (async), which execute the task function directly with a mock context:

Hatchet:

What changed:

  • task_always_eager is replaced by .mock_run() / .aio_mock_run(). These execute the task function locally without connecting to the Hatchet engine, similar to calling the function directly but with a mock context that provides retry_count, additional_metadata, and lifespan.
  • Integration tests use .run() / .aio_run(). To test against the full Hatchet engine, call .run() or .aio_run() with a running local or test instance.
  • See the unit testing example for patterns including sync, async, durable, and workflow-level mock testing.

Migration caveats to review

Before finishing the migration, review these areas that require design decisions rather than mechanical code changes:

  • Task function signatures. Every Hatchet task receives (input, ctx) instead of positional args. Creating Pydantic input models for tasks with varied argument signatures is typically the largest mechanical effort.
  • Canvas -> DAGs and child spawning. Celery Canvas composition must be restructured into Hatchet DAG workflows or child spawning. This is a design change, not a rename.
  • Manual retries. Celery’s self.retry() has no Hatchet equivalent. Restructure around NonRetryableException or handle retries within the task for specific calls.
  • Lifecycle hooks. Celery signals like task_prerun and task_postrun require workflow-level hooks, dependency injection, or restructuring in Hatchet.
  • Queue routing. Celery’s task_routes and named queues become explicit worker registration and optionally worker affinity (a different routing model).

Hatchet-native features to adopt after migration

Once your tasks are running on Hatchet, these features go beyond Celery parity:

  • Global rate limits: Key-based rate limiting enforced across all workers. Celery’s rate_limit is per-worker only.
  • Concurrency strategies: Per-key concurrency control with strategies such as cancel-in-progress or cancel-newest. Celery has no per-task concurrency control.
  • Durable sleep: Pause a workflow for minutes, hours, or days without holding a worker slot.
  • Durable event waits: Pause until an external event arrives. Useful for webhook-driven or human-in-the-loop workflows.
  • Durable tasks: Imperative workflow composition with checkpointing for long-running, stateful workflows.

None of these are needed for a basic migration. They become useful when your workloads grow from background tasks into durable workflows.

After the basic migration is working, use async APIs for I/O-bound tasks, lifespans to initialize shared resources once per worker, and dependency injection to pass common dependencies into tasks without wiring them manually in every function.

Before migrating every workload

Migration does not have to be all-or-nothing. Consider keeping specific workloads on Celery if:

  • They are simple fire-and-forget tasks that are already reliable and do not need orchestration or observability.
  • They depend on Celery ecosystem integrations (django-celery-beat, django-celery-results, or broker-specific features) that would be costly to replace.
  • They require extremely high throughput and do not need durable retention, observability, or workflow orchestration. Review Hatchet’s architecture and guarantees before migrating those workloads.

Final cleanup

After the last Celery task is migrated, remove the celery package declaration, including any Celery extras, from your dependency file and regenerate your lock file. Transitive dependencies like kombu and amqp will be removed automatically. Remove broker client packages (such as redis) only if no other code uses them. Delete celeryconfig.py and any Celery-specific environment variables (broker_url, result_backend, etc.) once nothing references them. Remove Celery worker, Beat, and Flower process definitions from your deployment configuration.

Next steps