Skip to content

Handler to configure DAG in Lambda for AWS Step Functions State Machine

License

Notifications You must be signed in to change notification settings

mashiike/lambdag

Repository files navigation

lambdag

Documentation GitHub go.mod Go version (branch) GitHub tag (latest SemVer) Github Actions test License

DAG Execution for AWS StepFunctions with Lambda Function

Motivation :

  • Amazon State Language (ASL) data flow (InputPath, ResultPath,ResultSelector,etc...) was too difficult.
  • AWS StepFunctions machine retry and error catching mechanisms are great!
  • In the end, for difficult processes, it is tempting to rely on Lambda functions, but it is hard to create a Lambda function for each processing unit.
  • Airflow's DAG was good, I want to write something like DAG in Golang

Ideas :

Let the AWS StepFunctions state machine concentrate on things outside of the pipeline logic, such as retries, SNS notifications, etc., and contain the pipeline logic in a single Lambda function. Then, the state machine should focus on invoking a single Lambda function.

Usage

So here is the simplest StateMachine example

simplest

definition.asl.json

and, The Lambda function that this state machine invokes is written as follows

package main

import (
	"context"
	"encoding/json"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/mashiike/lambdag"
)

func main() {
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
	defer cancel()
	dag, err := lambdag.NewDAG("SampleDAG")
	if err != nil {
		log.Fatal(err)
	}
	task1, err := dag.NewTask("task1", lambdag.TaskHandlerFunc(func(ctx context.Context, tr *lambdag.TaskRequest) (interface{}, error) {
		return `"task1 success"`, nil
	}))
	if err != nil {
		log.Fatal(err)
	}
	task2, err := dag.NewTask("task2", lambdag.TaskHandlerFunc(func(ctx context.Context, tr *lambdag.TaskRequest) (interface{}, error) {
		return "task2 success", nil
	}))
	if err != nil {
		log.Fatal(err)
	}
	err = task1.SetDownstream(task2)
	if err != nil {
		log.Fatal(err)
	}
	lambdag.RunWithContext(ctx, os.Args[1:], dag)
}
graph LR
    task1("task1")
    task2("task2")

    task1-->task2
Loading

It is assumed to work with the runtime of the provided.al2

The build should look like this

$ GOOS=linux GOARCH=arm64 go build -o bootstrap _examples/src/main.go

To run the example in a Local environment using StepFunctions Local

$ cd _examples
$ make prepare
$ make build
$ make run/<execution_name>
$ make history/<execution_name>

Usage (for local development)

lambdag.RunWithContext(ctx, os.Args[1:], dag)

if local execution run as CLI

$ go run _examples/src/main.go 
Usage: SampleDAG <flags> <subcommand> <subcommand args>

Subcommands:
        commands         list all command names
        flags            describe all known top-level flags
        help             describe subcommands and their syntax
        render           rendering DAG
        serve            start a stub server for the lambda Invoke API

CLI has stub server for the lambda Invoke API

$ go run _examples/src/main.go serve
2022/06/17 13:51:55 [info] starting up with Stub lambda API http://:3001
aws lambda --endpoint http://localhost:3001 invoke --function-name SampleDAG --cli-binary-format raw-in-base64-out --payload '{"Comment":"this is dag run config"}' output.txt --log-type Tail --qualifier current

LICENSE

MIT License

Copyright (c) 2022 IKEDA Masashi

About

Handler to configure DAG in Lambda for AWS Step Functions State Machine

Resources

License

Stars

Watchers

Forks

Packages

No packages published