SDK Reference
Go SDK
Creating a Workflow

Creating a Workflow

The simplest way to define a workflow is by using the worker.RegisterWorkflow method. This method is passed the workflow definition, which includes triggers for the workflow and the steps that the workflow should execute. For example, to trigger a workflow on the user:created event, you can do the following:

w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "simple-workflow",
        Description: "Simple one-step workflow.",
        On:          worker.Event("user:created"),
        Steps: []*worker.WorkflowStep{
            worker.Fn(func(ctx worker.HatchetContext) error {
                return nil
            }),
        },
    },
)

Supported Workflow Triggers

  • worker.Event - triggers a workflow when an event is received
  • worker.Events - triggers a workflow when any of the given events are received
  • worker.Cron - triggers a workflow on a cron schedule
  • worker.Crons - triggers a workflow from multiple cron schedules
  • worker.At - triggers a workflow at a specific time. This is useful for one-off workflows. You can also schedule workflows using the Admin API - see here (opens in a new tab) for more information.

Supported Workflow Definitions

  • *worker.WorkflowJob - a workflow that executes a series of steps
  • worker.Fn - a single-step workflow (see below)

Single-Step Workflows

If your workflow is a single method that is only triggered via API, you can use worker.Fn to define your workflow:

w.RegisterWorkflow(
    worker.Fn(func(ctx worker.HatchetContext) error {
        return nil
    }),
)

Anonymous functions will be given an auto-generated name based on the package and parent function name. To avoid ugly auto-generated names, you can use SetName on the worker.Fn struct:

w.RegisterWorkflow(
    worker.Fn(func(ctx worker.HatchetContext) error {
        return nil
    }).SetName("post-user-create"), // this workflow will be named "post-user-create"
)

Multi-Step Workflows

Hatchet supports generating multi-step workflows by specifying parent steps as dependencies. Any parents declared as dependencies will be executed before the current step, and their output data will be available to the current step. For example, the following workflow declares two steps, step-one and step-two. step-two depends on step-one, so step-one will be executed first, and its output will be passed to step-two:

type stepOneOutput struct {
    Message string `json:"message"`
}
 
err = w.RegisterWorkflow(
 
	&worker.WorkflowJob{
		Name:        "two-step-workflow",
		Description: "This is an example two-step workflow.",
        On:          worker.Events("user:create"),
		Steps: []*worker.WorkflowStep{
			worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
				input := &userCreateEvent{}
                ctx.WorkflowInput(input) // typically you would handle this error
 
				return &stepOneOutput{
					Message: "Username is: " + input.Username,
				}, nil
			}).SetName("step-one"),
			worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
				input := &stepOneOutput{}
				ctx.StepOutput("step-one", input) // typically you would handle this error
 
				return &stepOneOutput{
					Message: "Above message is: " + input.Message,
				}, nil
			}).SetName("step-two").AddParents("step-one"), // note the usage of `AddParents`
		},
	},
)

Getting Access to the Input Data

You can get access to the workflow's input data, such as the event data or other specified input data, by using the WorkflowInput method on the HatchetContext. For example, given the following event:

type MyEvent struct {
    Name string `json:"name"`
}

You can get access to the event data by doing the following:

func FirstStep(ctx worker.HatchetContext) error {
    event := &MyEvent{}
    err := ctx.WorkflowInput(event)
 
    if err != nil {
        return err
    }
 
    fmt.Println("got event: ", event.Name)
    return nil
}

Step Function Signatures

Step functions must always accept a worker.HatchetContext as the first argument (or alternatively, context.Context), and must return an error as the last return value. They can optionally return a value, which must be a pointer to a struct. At the moment, the following are valid step functions:

func (ctx worker.HatchetContext) error
func (ctx worker.HatchetContext) (*myOutput, error)

Why pointers to structs? We use JSON marshalling/unmarshalling under the hood, and pointers to structs are the most predictable way to marshal and unmarshal values. You can use json tags and MarshalJSON + UnmarshalJSON methods to customize the marshalling/unmarshalling behavior.

Services

Services are a way to logically group workflows into different categories. For example, you may have a user service that contains all workflows related to users. You can define a service by using the worker.NewService method. For example, to define a user service, you can do the following:

userService := w.NewService("user")
 
userService.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "post-user-sign-up",
        On:          worker.Event("user:created"),
        Description: "Workflow that executes after a user signs up.",
        Timeout:     "60s",
        Steps: []*worker.WorkflowStep{
            {
                Function: func(ctx context.Context) error {
                    fmt.Println("running post-user sign up")
                    return nil
                },
            },
        },
    },
)

While this is mostly a convenience method at the moment, we plan to add more features to services in the future, like service-level metrics and service-level retries.

Concurrency Limits and Fairness

Note: this feature is currently in beta, and currently only supports a concurrency strategy which terminates the oldest running workflow run to make room for the new one. This will be expanded in the future to support other strategies.**

By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by setting the Concurrency field on the WorkflowJob struct. You can use worker.Concurrency and pass in a function with a signature func (ctx worker.HatchetContext) (string, error). This function returns a concurrency group key, which is a string that is used to group concurrent executions. For example, the following workflow will only allow 5 concurrent executions for any workflow execution of concurrency-limit, since the key is statically set to my-key:

func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
	return "my-key", nil
}
 
err = w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "concurrency-limit",
        On:          worker.Events("concurrency-test-event"),
        Description: "This limits concurrency to 1 run at a time.",
        Concurrency: worker.Concurrency(getConcurrencyKey).MaxRuns(1),
        Steps: []*worker.WorkflowStep{
            // your steps here...
        },
    },
)

Use-Case: Enforcing Per-User Concurrency Limits

You can use the custom concurrency function to enforce per-user concurrency limits. For example, the following workflow will only allow 1 concurrent execution per user:

type MyUser struct {
    UserId string `json:"user_id"`
}
 
func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
	event := &MyEvent{}
    err := ctx.WorkflowInput(event)
 
    if err != nil {
        return "", err
    }
 
    return event.UserId, nil
}
 
err = w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "concurrency-limit-per-user",
        On:          worker.Events("concurrency-test-event"),
        Description: "This limits concurrency to 1 run at a time per user.",
        Concurrency: worker.Concurrency(getConcurrencyKey).MaxRuns(1),
        Steps: []*worker.WorkflowStep{
            // your steps here...
        },
    },
)

Cron Schedules

You can declare a cron schedule by passing worker.Cron to the worker.RegisterWorkflow method. For example, to trigger a workflow every 5 minutes, you can do the following:

w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name:        "my-cron-job",
        On:          worker.Cron("*/5 * * * *"),
        Description: "Cron workflow example.",
        Timeout:     "60s",
        Steps: []*worker.WorkflowStep{
            {
                Function: func(ctx context.Context) error {
                    fmt.Println("triggered at:", time.Now())
                    return nil
                },
            },
        },
    },
)

Middleware

You can define middleware that will be executed before and after each step function. Middleware functions have the following signature:

func(ctx context.Context, next func(context.Context) error) error

You can register this middleware globally (at the worker level) or at the service level, using worker.Use and service.Use, respectively. For example, to define a middleware that logs the start and end of each step function, you can do the following:

w.Use(func(ctx context.Context, next func(context.Context) error) error {
    // time the function duration
    start := time.Now()
    err := next(ctx)
    duration := time.Since(start)
    fmt.Printf("step function took %s\n", duration)
    return err
})

You can also use the middleware to add values to the context. For example:

w.Use(func(ctx context.Context, next func(context.Context) error) error {
    err := next(context.WithValue(ctx, "testkey", "testvalue"))
 
    if err != nil {
        return fmt.Errorf("error in middleware: %w", err)
    }
 
    return nil
})

Re-using Actions

If you have a common set of steps that you want to re-use across multiple workflows, you can define use RegisterAction on either a service or a worker. For example, to define a send-email action:

testSvc := w.NewService("test")
 
err = testSvc.RegisterAction(StepOne, worker.WithActionName("step-one"))
 
if err != nil {
	panic(err)
}
 
err = testSvc.RegisterWorkflow(
	testSvc.Call("step-one"),
)

Note the usage of testSvc.Call("step-one") to invoke a single-step action.