Creating a Workflow
The simplest way to define a workflow is by using the worker.RegisterWorkflow
method. This method is passed the workflow definition, which includes triggers for the workflow and the steps that the workflow should execute. For example, to trigger a workflow on the user:created
event, you can do the following:
w.RegisterWorkflow(
&worker.WorkflowJob{
Name: "simple-workflow",
Description: "Simple one-step workflow.",
On: worker.Event("user:created"),
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) error {
return nil
}),
},
},
)
Supported Workflow Triggers
worker.Event
- triggers a workflow when an event is receivedworker.Events
- triggers a workflow when any of the given events are receivedworker.Cron
- triggers a workflow on a cron scheduleworker.Crons
- triggers a workflow from multiple cron schedulesworker.At
- triggers a workflow at a specific time. This is useful for one-off workflows. You can also schedule workflows using theAdmin
API - see here (opens in a new tab) for more information.
Supported Workflow Definitions
*worker.WorkflowJob
- a workflow that executes a series of stepsworker.Fn
- a single-step workflow (see below)
Single-Step Workflows
If your workflow is a single method that is only triggered via API, you can use worker.Fn
to define your workflow:
w.RegisterWorkflow(
worker.Fn(func(ctx worker.HatchetContext) error {
return nil
}),
)
Anonymous functions will be given an auto-generated name based on the package and parent function name. To avoid ugly auto-generated names, you can use SetName
on the worker.Fn
struct:
w.RegisterWorkflow(
worker.Fn(func(ctx worker.HatchetContext) error {
return nil
}).SetName("post-user-create"), // this workflow will be named "post-user-create"
)
Multi-Step Workflows
Hatchet supports generating multi-step workflows by specifying parent steps as dependencies. Any parents declared as dependencies will be executed before the current step, and their output data will be available to the current step. For example, the following workflow declares two steps, step-one
and step-two
. step-two
depends on step-one
, so step-one
will be executed first, and its output will be passed to step-two
:
type stepOneOutput struct {
Message string `json:"message"`
}
err = w.RegisterWorkflow(
&worker.WorkflowJob{
Name: "two-step-workflow",
Description: "This is an example two-step workflow.",
On: worker.Events("user:create"),
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
input := &userCreateEvent{}
ctx.WorkflowInput(input) // typically you would handle this error
return &stepOneOutput{
Message: "Username is: " + input.Username,
}, nil
}).SetName("step-one"),
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
input := &stepOneOutput{}
ctx.StepOutput("step-one", input) // typically you would handle this error
return &stepOneOutput{
Message: "Above message is: " + input.Message,
}, nil
}).SetName("step-two").AddParents("step-one"), // note the usage of `AddParents`
},
},
)
Getting Access to the Input Data
You can get access to the workflow's input data, such as the event data or other specified input data, by using the WorkflowInput
method on the HatchetContext
. For example, given the following event:
type MyEvent struct {
Name string `json:"name"`
}
You can get access to the event data by doing the following:
func FirstStep(ctx worker.HatchetContext) error {
event := &MyEvent{}
err := ctx.WorkflowInput(event)
if err != nil {
return err
}
fmt.Println("got event: ", event.Name)
return nil
}
Step Function Signatures
Step functions must always accept a worker.HatchetContext
as the first argument (or alternatively, context.Context
), and must return an error
as the last return value. They can optionally return a value, which must be a pointer to a struct. At the moment, the following are valid step functions:
func (ctx worker.HatchetContext) error
func (ctx worker.HatchetContext) (*myOutput, error)
Why pointers to structs? We use JSON marshalling/unmarshalling under the hood, and pointers to structs are the most predictable way to marshal and unmarshal values. You can use
json
tags andMarshalJSON
+UnmarshalJSON
methods to customize the marshalling/unmarshalling behavior.
Services
Services are a way to logically group workflows into different categories. For example, you may have a user
service that contains all workflows related to users. You can define a service by using the worker.NewService
method. For example, to define a user
service, you can do the following:
userService := w.NewService("user")
userService.RegisterWorkflow(
&worker.WorkflowJob{
Name: "post-user-sign-up",
On: worker.Event("user:created"),
Description: "Workflow that executes after a user signs up.",
Timeout: "60s",
Steps: []*worker.WorkflowStep{
{
Function: func(ctx context.Context) error {
fmt.Println("running post-user sign up")
return nil
},
},
},
},
)
While this is mostly a convenience method at the moment, we plan to add more features to services in the future, like service-level metrics and service-level retries.
Concurrency Limits and Fairness
Note: this feature is currently in beta, and currently only supports a concurrency strategy which terminates the oldest running workflow run to make room for the new one. This will be expanded in the future to support other strategies.**
By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by setting the Concurrency
field on the WorkflowJob
struct. You can use worker.Concurrency
and pass in a function with a signature func (ctx worker.HatchetContext) (string, error)
. This function returns a concurrency group key, which is a string that is used to group concurrent executions. For example, the following workflow will only allow 5 concurrent executions for any workflow execution of concurrency-limit
, since the key is statically set to my-key
:
func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
return "my-key", nil
}
err = w.RegisterWorkflow(
&worker.WorkflowJob{
Name: "concurrency-limit",
On: worker.Events("concurrency-test-event"),
Description: "This limits concurrency to 1 run at a time.",
Concurrency: worker.Concurrency(getConcurrencyKey).MaxRuns(1),
Steps: []*worker.WorkflowStep{
// your steps here...
},
},
)
Use-Case: Enforcing Per-User Concurrency Limits
You can use the custom concurrency function to enforce per-user concurrency limits. For example, the following workflow will only allow 1 concurrent execution per user:
type MyUser struct {
UserId string `json:"user_id"`
}
func getConcurrencyKey(ctx worker.HatchetContext) (string, error) {
event := &MyEvent{}
err := ctx.WorkflowInput(event)
if err != nil {
return "", err
}
return event.UserId, nil
}
err = w.RegisterWorkflow(
&worker.WorkflowJob{
Name: "concurrency-limit-per-user",
On: worker.Events("concurrency-test-event"),
Description: "This limits concurrency to 1 run at a time per user.",
Concurrency: worker.Concurrency(getConcurrencyKey).MaxRuns(1),
Steps: []*worker.WorkflowStep{
// your steps here...
},
},
)
Cron Schedules
You can declare a cron schedule by passing worker.Cron
to the worker.RegisterWorkflow
method. For example, to trigger a workflow every 5 minutes, you can do the following:
w.RegisterWorkflow(
&worker.WorkflowJob{
Name: "my-cron-job",
On: worker.Cron("*/5 * * * *"),
Description: "Cron workflow example.",
Timeout: "60s",
Steps: []*worker.WorkflowStep{
{
Function: func(ctx context.Context) error {
fmt.Println("triggered at:", time.Now())
return nil
},
},
},
},
)
Middleware
You can define middleware that will be executed before and after each step function. Middleware functions have the following signature:
func(ctx context.Context, next func(context.Context) error) error
You can register this middleware globally (at the worker level) or at the service level, using worker.Use
and service.Use
, respectively. For example, to define a middleware that logs the start and end of each step function, you can do the following:
w.Use(func(ctx context.Context, next func(context.Context) error) error {
// time the function duration
start := time.Now()
err := next(ctx)
duration := time.Since(start)
fmt.Printf("step function took %s\n", duration)
return err
})
You can also use the middleware to add values to the context. For example:
w.Use(func(ctx context.Context, next func(context.Context) error) error {
err := next(context.WithValue(ctx, "testkey", "testvalue"))
if err != nil {
return fmt.Errorf("error in middleware: %w", err)
}
return nil
})
Re-using Actions
If you have a common set of steps that you want to re-use across multiple workflows, you can define use RegisterAction
on either a service or a worker. For example, to define a send-email
action:
testSvc := w.NewService("test")
err = testSvc.RegisterAction(StepOne, worker.WithActionName("step-one"))
if err != nil {
panic(err)
}
err = testSvc.RegisterWorkflow(
testSvc.Call("step-one"),
)
Note the usage of testSvc.Call("step-one")
to invoke a single-step action.