diff --git a/README.md b/README.md index 073b6dc..a1e7d2e 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,10 @@ variable "nomad_addr" { job "example-job" { + meta = { + "nomad-pipeline.enabled" = "true" + } + group "▶️" { count = 1 @@ -27,7 +31,7 @@ job "example-job" { config { image = "ghcr.io/hyperbadger/nomad-pipeline:main" - args = ["-init"] + args = ["agent", "init"] } env { @@ -51,8 +55,8 @@ job "example-job" { count = 0 # <-- Important! nomad-pipeline will control the count meta = { - "nomad-pipeline/root" = "true" # <-- Indicates the starting task group - "nomad-pipeline/next" = "2-second-task-group" + "nomad-pipeline.root" = "true" # <-- Indicates the starting task group + "nomad-pipeline.next" = "2-second-task-group" } ... @@ -92,7 +96,7 @@ job "example-job" { ***Using dependencies*** -To support running tasks in parallel and having a final task that runs at the end of all parallel tasks (eg. fan-out fan-in pattern), you can use the `nomad-pipeline/dependencies` tag. +To support running tasks in parallel and having a final task that runs at the end of all parallel tasks (eg. fan-out fan-in pattern), you can use the `nomad-pipeline.dependencies` tag. ```mermaid graph TD; @@ -110,7 +114,7 @@ group "E" { count = 0 meta = { - "nomad-pipeline/dependencies" = "C, D" + "nomad-pipeline.dependencies" = "C, D" } ... @@ -121,9 +125,9 @@ See [`dependencies.hcl`](examples/dependencies.hcl) for a more complete example. ***Using count*** -Another way to implement the fan-out fan-in pattern is to have multiple instances of a task group that can all pick up some work. Without nomad-pipeline, this is quite easy, you just set the [`count` stanza](https://www.nomadproject.io/docs/job-specification/group#count) on the task group. However, when using nomad-pipeline, the control of count is not in your hands. So if you want to set a count greater than 1, you can set the `nomad-pipeline/count` tag. +Another way to implement the fan-out fan-in pattern is to have multiple instances of a task group that can all pick up some work. Without nomad-pipeline, this is quite easy, you just set the [`count` stanza](https://www.nomadproject.io/docs/job-specification/group#count) on the task group. However, when using nomad-pipeline, the control of count is not in your hands. So if you want to set a count greater than 1, you can set the `nomad-pipeline.count` tag. -> 💡 *Tip: The [`count` stanza](https://www.nomadproject.io/docs/job-specification/group#count) doesn't support variable interpolation since the config value is an integer and not a string - currently Nomad only support variable interpolation for string config values. This means that `count` can't be set from a `NOMAD_META_` variable, which is required for setting the `count` dynamically in a parameterized job. Using the `nomad-pipeline/count` tag allows you work around this. All `nomad-pipeline/*` tags interpolates variables, so you can use something like `"nomad-pipeline/count" = "${NOMAD_META_count}"`* +> 💡 *Tip: The [`count` stanza](https://www.nomadproject.io/docs/job-specification/group#count) doesn't support variable interpolation since the config value is an integer and not a string - currently Nomad only support variable interpolation for string config values. This means that `count` can't be set from a `NOMAD_META_` variable, which is required for setting the `count` dynamically in a parameterized job. Using the `nomad-pipeline.count` tag allows you work around this. All `nomad-pipeline.*` tags interpolates variables, so you can use something like `"nomad-pipeline.count" = "${NOMAD_META_count}"`* See [`examples/fan-out-fan-in.hcl`](examples/fan-out-fan-in.hcl) for a more complete example. @@ -131,7 +135,7 @@ See [`examples/fan-out-fan-in.hcl`](examples/fan-out-fan-in.hcl) for a more comp Dynamic tasks allows you to have a task that outputs more tasks 🤯. These tasks are then run as part of the job. This can open up the possibility to create some powerful pipelines. An example use case is for creating periodic splits of a longer task, if you have a task that processes 5 hours of some data, you could split the task into 5x 1 hour tasks and run them in parallel. This can be achieved by having an initial task that outputs the 5 split tasks as an output. -To use dynamic tasks, set the `nomad-pipeline/dynamic-tasks` tag to a path/glob of where the task JSON's will be outputted. This path should be relative to [`NOMAD_ALLOC_DIR`](https://www.nomadproject.io/docs/runtime/environment#alloc). +To use dynamic tasks, set the `nomad-pipeline.dynamic-tasks` tag to a path/glob of where the task JSON's will be outputted. This path should be relative to [`NOMAD_ALLOC_DIR`](https://www.nomadproject.io/docs/runtime/environment#alloc). In the following example, the 1-generate-tasks first runs and outputs the 2-echo-hey task group which then gets launched after 1-generate-tasks finishes. @@ -140,8 +144,8 @@ group "1-generate-tasks" { count = 0 meta = { - "nomad-pipeline/root" = "true" - "nomad-pipeline/dynamic-tasks" = "tasks/*" + "nomad-pipeline.root" = "true" + "nomad-pipeline.dynamic-tasks" = "tasks/*" } task "generate-tasks" { @@ -158,7 +162,7 @@ group "1-generate-tasks" { "Name": "2-echo-hey", "Count": 0, "Meta": { - "nomad-pipeline/root": "true" + "nomad-pipeline.root": "true" }, "Tasks": [{ "Name": "echo", @@ -182,14 +186,14 @@ See [`dynamic-job.hcl`](examples/dynamic-job.hcl) for a more complete example. Nomad currently allows you to set a [`leader`](https://www.nomadproject.io/docs/job-specification/task#leader) at the task level. This allows you to gracefully shutdown all other tasks in the group when the leader task exits. -Using the `nomad-pipeline/leader` tag, you can get the same functionality at the job level. You can set the tag on a task group, and when that task group completes, all other task groups will be gracefully shutdown. +Using the `nomad-pipeline.leader` tag, you can get the same functionality at the job level. You can set the tag on a task group, and when that task group completes, all other task groups will be gracefully shutdown. ```hcl group "leader" { count = 0 meta = { - "nomad-pipeline/leader" = "true" + "nomad-pipeline.leader" = "true" } ... diff --git a/examples/dependencies.hcl b/examples/dependencies.hcl index e8ebc08..5e4e853 100644 --- a/examples/dependencies.hcl +++ b/examples/dependencies.hcl @@ -23,6 +23,10 @@ job "dependencies" { datacenters = var.datacenters type = "batch" + meta = { + "nomad-pipeline.enabled" = "true" + } + group "▶️" { task "init" { driver = "docker" @@ -46,8 +50,8 @@ job "dependencies" { count = 0 meta = { - "nomad-pipeline/root" = "true" - "nomad-pipeline/next" = "2-dependent" + "nomad-pipeline.root" = "true" + "nomad-pipeline.next" = "2-dependent" } task "normal" { @@ -76,8 +80,8 @@ job "dependencies" { count = 0 meta = { - "nomad-pipeline/root" = "true" - "nomad-pipeline/next" = "2-dependent" + "nomad-pipeline.root" = "true" + "nomad-pipeline.next" = "2-dependent" } task "normal" { @@ -106,8 +110,8 @@ job "dependencies" { count = 0 meta = { - "nomad-pipeline/root" = "true" - "nomad-pipeline/next" = "2-dependent" + "nomad-pipeline.root" = "true" + "nomad-pipeline.next" = "2-dependent" } task "normal" { @@ -136,7 +140,7 @@ job "dependencies" { count = 0 meta = { - "nomad-pipeline/dependencies" = "1a-task, 1b-task, 1c-task" + "nomad-pipeline.dependencies" = "1a-task, 1b-task, 1c-task" } task "dependent" { diff --git a/examples/dynamic-job.hcl b/examples/dynamic-job.hcl index 882751d..33c8aa5 100644 --- a/examples/dynamic-job.hcl +++ b/examples/dynamic-job.hcl @@ -23,6 +23,10 @@ job "dynamic" { datacenters = var.datacenters type = "batch" + meta = { + "nomad-pipeline.enabled" = "true" + } + group "▶️" { task "init" { driver = "docker" @@ -46,8 +50,8 @@ job "dynamic" { count = 0 meta = { - "nomad-pipeline/root" = "true" - "nomad-pipeline/dynamic-tasks" = "tasks/*" + "nomad-pipeline.root" = "true" + "nomad-pipeline.dynamic-tasks" = "tasks/*" } task "generate-tasks" { @@ -65,8 +69,8 @@ job "dynamic" { "Name": "${TASK_NAME}", "Count": 0, "Meta": { - "nomad-pipeline/root": "true", - "nomad-pipeline/next": "3-last" + "nomad-pipeline.root": "true", + "nomad-pipeline.next": "3-last" }, "Tasks": [ { @@ -92,8 +96,8 @@ job "dynamic" { "Name": "3-last", "Count": 0, "Meta": { - "nomad-pipeline/root": "true", - "nomad-pipeline/dependencies": "2a-echo,2b-echo" + "nomad-pipeline.root": "true", + "nomad-pipeline.dependencies": "2a-echo,2b-echo" }, "Tasks": [ { diff --git a/examples/fan-out-fan-in.hcl b/examples/fan-out-fan-in.hcl index 458e65f..7f1e859 100644 --- a/examples/fan-out-fan-in.hcl +++ b/examples/fan-out-fan-in.hcl @@ -23,6 +23,10 @@ job "fan-out-fan-in" { datacenters = var.datacenters type = "batch" + meta = { + "nomad-pipeline.enabled" = "true" + } + group "▶️" { task "init" { driver = "docker" @@ -46,8 +50,8 @@ job "fan-out-fan-in" { count = 0 meta = { - "nomad-pipeline/root" = "true" - "nomad-pipeline/next" = "2-do-work" + "nomad-pipeline.root" = "true" + "nomad-pipeline.next" = "2-do-work" } task "submit" { @@ -76,8 +80,8 @@ job "fan-out-fan-in" { count = 0 meta = { - "nomad-pipeline/count" = "5" - "nomad-pipeline/next" = "3-process-output" + "nomad-pipeline.count" = "5" + "nomad-pipeline.next" = "3-process-output" } scaling { @@ -112,7 +116,7 @@ job "fan-out-fan-in" { count = 0 meta = { - "nomad-pipeline/dependencies" = "2-do-work" + "nomad-pipeline.dependencies" = "2-do-work" } task "process" { diff --git a/examples/happy-job.hcl b/examples/happy-job.hcl index 3aa6ef3..f6083cb 100644 --- a/examples/happy-job.hcl +++ b/examples/happy-job.hcl @@ -23,6 +23,10 @@ job "happy" { datacenters = var.datacenters type = "batch" + meta = { + "nomad-pipeline.enabled" = "true" + } + group "▶️" { count = 1 @@ -48,8 +52,8 @@ job "happy" { count = 0 meta = { - "nomad-pipeline/root" = "true" - "nomad-pipeline/next" = "2-multi-task-group" + "nomad-pipeline.root" = "true" + "nomad-pipeline.next" = "2-multi-task-group" } task "normal" { @@ -78,7 +82,7 @@ job "happy" { count = 0 meta = { - "nomad-pipeline/next" = "3a-parallel,3b-parallel-i" + "nomad-pipeline.next" = "3a-parallel,3b-parallel-i" } task "first_task" { @@ -102,7 +106,7 @@ job "happy" { count = 0 meta = { - "nomad-pipeline/next" = "4-dependent" + "nomad-pipeline.next" = "4-dependent" } task "parallel" { @@ -130,7 +134,7 @@ job "happy" { count = 0 meta = { - "nomad-pipeline/next" = "3b-parallel-ii" + "nomad-pipeline.next" = "3b-parallel-ii" } task "parallel" { @@ -158,7 +162,7 @@ job "happy" { count = 0 meta = { - "nomad-pipeline/next" = "4-dependent" + "nomad-pipeline.next" = "4-dependent" } task "parallel" { @@ -188,7 +192,7 @@ job "happy" { meta = { # BUG: when whole job is restarted, it will not wait for this task group, # 4-dependent will run as soon as 3b-parallel-i finishes - "nomad-pipeline/dependencies" = "3b-parallel-ii" + "nomad-pipeline.dependencies" = "3b-parallel-ii" } task "dependent" { diff --git a/examples/leader-task-group.hcl b/examples/leader-task-group.hcl index 4c02050..b8926c8 100644 --- a/examples/leader-task-group.hcl +++ b/examples/leader-task-group.hcl @@ -23,6 +23,10 @@ job "leader-task-group" { datacenters = var.datacenters type = "batch" + meta = { + "nomad-pipeline.enabled" = "true" + } + group "▶️" { task "init" { driver = "docker" @@ -46,8 +50,8 @@ job "leader-task-group" { count = 0 meta = { - "nomad-pipeline/root" = "true" - "nomad-pipeline/leader" = "true" + "nomad-pipeline.root" = "true" + "nomad-pipeline.leader" = "true" } task "some-process" { @@ -75,7 +79,7 @@ job "leader-task-group" { count = 0 meta = { - "nomad-pipeline/root" = "true" + "nomad-pipeline.root" = "true" } task "forever-run" { diff --git a/pkg/controller/pipeline.go b/pkg/controller/pipeline.go index 306d82a..b1aebd2 100644 --- a/pkg/controller/pipeline.go +++ b/pkg/controller/pipeline.go @@ -18,16 +18,20 @@ import ( ) const ( - countTag = "nomad-pipeline/count" - dependenciesTag = "nomad-pipeline/dependencies" - dynamicMemoryMBTag = "nomad-pipeline/dynamic-memory-mb" - dynamicTasksTag = "nomad-pipeline/dynamic-tasks" - leaderTag = "nomad-pipeline/leader" - nextTag = "nomad-pipeline/next" - rootTag = "nomad-pipeline/root" + TagPrefix = "nomad-pipeline" + TagEnabled = TagPrefix + ".enabled" + TagCount = TagPrefix + ".count" + TagDependencies = TagPrefix + ".dependencies" + TagDynamicMemoryMB = TagPrefix + ".dynamic-memory-mb" + TagDynamicTasks = TagPrefix + ".dynamic-tasks" + TagLeader = TagPrefix + ".leader" + TagNext = TagPrefix + ".next" + TagRoot = TagPrefix + ".root" // internal tags, not meant to be set by user - _parentTask = "nomad-pipeline/_parent-task" + TagInternalPrefix = TagPrefix + ".internal" + TagParentTask = TagInternalPrefix + ".parent-task" + TagParentPipeline = TagInternalPrefix + ".parent-pipeline" ) func i2p(i int) *int { @@ -168,7 +172,7 @@ func successState(state *nomad.TaskState) bool { return true } -func tgDone(allocs []*nomad.AllocationListStub, groups []string, success bool) bool { +func TgDone(allocs []*nomad.AllocationListStub, groups []string, success bool) bool { if len(groups) == 0 || len(allocs) == 0 { return false } @@ -349,7 +353,7 @@ type PipelineController struct { } func NewPipelineController(cPath string) *PipelineController { - dc := PipelineController{ + pc := PipelineController{ JobID: os.Getenv("NOMAD_JOB_ID"), GroupName: os.Getenv("NOMAD_GROUP_NAME"), TaskName: os.Getenv("NOMAD_TASK_NAME"), @@ -364,19 +368,19 @@ func NewPipelineController(cPath string) *PipelineController { log.Fatalf("error creating client: %v", err) } - dc.Nomad = nClient - dc.JobsAPI = nClient.Jobs() - dc.AllocsAPI = nClient.Allocations() + pc.Nomad = nClient + pc.JobsAPI = nClient.Jobs() + pc.AllocsAPI = nClient.Allocations() - log.Infof("getting job: %q", dc.JobID) - job, _, err := dc.JobsAPI.Info(dc.JobID, &nomad.QueryOptions{}) + log.Infof("getting job: %q", pc.JobID) + job, _, err := pc.JobsAPI.Info(pc.JobID, &nomad.QueryOptions{}) if err != nil { log.Fatalf("error getting job: %v", err) } - dc.Job = job + pc.Job = job - return &dc + return &pc } func (pc *PipelineController) UpdateJob() error { @@ -431,17 +435,17 @@ func (pc *PipelineController) ProcessTaskGroups(filters ...map[string]string) ([ Name: *tGroup.Name, } - if next := lookupMetaTagStr(tGroup.Meta, nextTag); len(next) > 0 { + if next := lookupMetaTagStr(tGroup.Meta, TagNext); len(next) > 0 { task.Next = split(next) } - if dependencies := lookupMetaTagStr(tGroup.Meta, dependenciesTag); len(dependencies) > 0 { + if dependencies := lookupMetaTagStr(tGroup.Meta, TagDependencies); len(dependencies) > 0 { task.Dependencies = split(dependencies) } tasks = append(tasks, task) - root, err := lookupMetaTagBool(tGroup.Meta, rootTag) + root, err := lookupMetaTagBool(tGroup.Meta, TagRoot) if err != nil { return nil, fmt.Errorf("error parsing root tag: %v", err) } @@ -451,7 +455,7 @@ func (pc *PipelineController) ProcessTaskGroups(filters ...map[string]string) ([ // not sure if this should be here for _, t := range tGroup.Tasks { - mem, err := lookupMetaTagInt(t.Meta, dynamicMemoryMBTag) + mem, err := lookupMetaTagInt(t.Meta, TagDynamicMemoryMB) if err != nil { return nil, fmt.Errorf("error parsing dynamic memory tag: %v", err) } @@ -510,7 +514,7 @@ func (pc *PipelineController) ProcessTaskGroups(filters ...map[string]string) ([ nArgs := append([]string{"agent", "next"}, task.Next...) - if dynTasks := lookupMetaTagStr(tGroup.Meta, dynamicTasksTag); len(dynTasks) > 0 { + if dynTasks := lookupMetaTagStr(tGroup.Meta, TagDynamicTasks); len(dynTasks) > 0 { nArgs = append([]string{"agent", "next", "--dynamic-tasks", dynTasks}, task.Next...) } @@ -539,7 +543,7 @@ func (pc *PipelineController) Init() bool { } if len(rTasks) == 0 { - log.Fatalf("couldn't find a root task group, need to set the root meta tag (%v)", rootTag) + log.Fatalf("couldn't find a root task group, need to set the root meta tag (%v)", TagRoot) } return pc.Next(rTasks, "") @@ -553,7 +557,7 @@ func (pc *PipelineController) Wait(groups []string) { log.Fatalf("error getting job allocations: %v", err) } - if tgDone(jAllocs, groups, true) { + if TgDone(jAllocs, groups, true) { log.Info("all dependent task groups finished successfully") return } @@ -649,7 +653,7 @@ func (pc *PipelineController) Wait(groups []string) { allocList = append(allocList, v) } - if tgDone(allocList, groups, true) { + if TgDone(allocList, groups, true) { log.Info("all dependent task groups finished successfully") return } @@ -676,7 +680,7 @@ func (pc *PipelineController) Next(groups []string, dynTasks string) bool { log.Fatalf("could not find current group (%v), this shouldn't happen!", pc.GroupName) } - leader, err := lookupMetaTagBool(cGroup.Meta, leaderTag) + leader, err := lookupMetaTagBool(cGroup.Meta, TagLeader) if err != nil { log.Warnf("error parsing leader, default to false: %v", err) } @@ -732,12 +736,12 @@ func (pc *PipelineController) Next(groups []string, dynTasks string) bool { for _, _tg := range tgs { tg := _tg - tg.SetMeta(_parentTask, pc.GroupName) + tg.SetMeta(TagParentTask, pc.GroupName) pc.Job.AddTaskGroup(&tg) } filter := map[string]string{ - _parentTask: pc.GroupName, + TagParentTask: pc.GroupName, } rTasks, err := pc.ProcessTaskGroups(filter) @@ -746,7 +750,7 @@ func (pc *PipelineController) Next(groups []string, dynTasks string) bool { } if len(rTasks) == 0 { - log.Fatalf("no root task group found, atleast one task in dynamic tasks must have root meta tag (%v)", rootTag) + log.Fatalf("no root task group found, atleast one task in dynamic tasks must have root meta tag (%v)", TagRoot) } groups = append(groups, rTasks...) @@ -758,14 +762,14 @@ func (pc *PipelineController) Next(groups []string, dynTasks string) bool { log.Warnf("could not find next group %v", group) continue } - if tgAllocated(jAllocs, []string{group}) && !tgDone(jAllocs, []string{group}, false) { + if tgAllocated(jAllocs, []string{group}) && !TgDone(jAllocs, []string{group}, false) { log.Warnf("next group already has allocations, skipping trigger: %v", group) continue } tg.Count = i2p(1) - count, err := lookupMetaTagInt(tg.Meta, countTag) + count, err := lookupMetaTagInt(tg.Meta, TagCount) if err != nil { log.Warn("error parsing count tag, defaulting to 1: %v", err) count = 1 @@ -775,7 +779,7 @@ func (pc *PipelineController) Next(groups []string, dynTasks string) bool { } } - if pc.TaskName == "init" || tgDone(jAllocs, []string{pc.GroupName}, true) { + if pc.TaskName == "init" || TgDone(jAllocs, []string{pc.GroupName}, true) { cGroup.Count = i2p(0) }