User Guide
Features
Worker Assignment
Worker Affinity

Worker Affinity Assignment (Beta)

This feature is currently in beta and may be subject to change.

It is often desirable to assign workflows to specific workers based on certain criteria, such as worker capabilities, resource availability, or location. Worker affinity allows you to specify that a workflow should be assigned to a specific worker based on worker label state. Labels can be set dynamically on workers to reflect their current state, such as a specific model loaded into memory or specific disk requirements.

Specific steps can then specify desired label state to ensure that workflows are assigned to workers that meet specific criteria. If no worker meets the specified criteria, the step run will remain in a pending state until a suitable worker becomes available or the step is cancelled. (See Scheduling Timeouts)

Specifying Worker Labels

Labels can be set on workers when they are registered with the Hatchet service. Labels are key-value pairs that can be used to specify worker capabilities, resource availability, or other criteria that can be used to match workflows to workers. Values can be strings or numbers, and multiple labels can be set on a worker.

worker = hatchet.worker(
    "affinity-worker",
    labels={
        "model": "fancy-ai-model-v2",
        "memory": 512,
    },
)

Specifying Step Desired Labels

You can specify desired worker label state for specific steps in a workflow by setting the worker_labels property on the step definition. This property is an object where the keys are the label keys and the values are objects with the following properties:

  • value: The desired value of the label
  • comparator (default: EQUAL): The comparison operator to use when matching the label value.
    • EQUAL: The label value must be equal to the desired value
    • NOT_EQUAL: The label value must not be equal to the desired value
    • GREATER_THAN: The label value must be greater than the desired value
    • GREATER_THAN_OR_EQUAL: The label value must be greater than or equal to the desired value
    • LESS_THAN: The label value must be less than the desired value
    • LESS_THAN_OR_EQUAL: The label value must be less than or equal to the desired value
  • required (default: true): Whether the label is required for the step to run. If true, the step will remain in a pending state until a worker with the desired label state becomes available. If false, the worker will be prioritized based on the sum of the highest matching weights.
  • weight (optional, default: 100): The weight of the label. Higher weights are prioritized over lower weights when selecting a worker for the step. If multiple workers have the same highest weight, the worker with the highest sum of weights will be selected. Ignored if required is true.
@hatchet.step(
    desired_worker_labels={
        "model": {
            "value": "fancy-ai-model-v2",
            "weight": 10
        },
        "memory": {
            "value": 256,
            "required": True,
            "comparator": WorkerLabelComparator.GREATER_THAN,
        }
    },
)
async def step(self, context: Context):
    return {"worker": context.worker.id()}
⚠️

Use extra care when using worker affinity with sticky assignment HARD strategy. In this case, it is recommended to set desired labels on the first step of the workflow to ensure that the workflow is assigned to a worker that meets the desired criteria and remains on that worker for the duration of the workflow.

Dynamic Worker Labels

Labels can also be set dynamically on workers using the upsertLabels method. This can be useful when worker state changes over time, such as when a new model is loaded into memory or when a worker's resource availability changes.

    @hatchet.step(
        desired_worker_labels={
            "model": {
              "value": "fancy-vision-model",
              "weight": 10
            },
            "memory": {
                "value": 256,
                "required": True,
                "comparator": WorkerLabelComparator.GREATER_THAN,
            },
        },
    )
    async def step(self, context: Context):
        if context.worker.get_labels().get("model") != "fancy-vision-model":
            context.worker.upsert_labels({"model": "unset"})
            # DO WORK TO EVICT OLD MODEL / LOAD NEW MODEL
            evictModel()
            loadNewModel("fancy-vision-model")
            context.worker.upsert_labels({"model": "fancy-vision-model"})
 
        return {"worker": context.worker.id()}