Skip to content

Commit

Permalink
Merge pull request #11 from saantiaguilera/feature/sa/v2
Browse files Browse the repository at this point in the history
V2
  • Loading branch information
saantiaguilera committed May 16, 2022
2 parents 78301b8 + 8dcb186 commit 4efaed7
Show file tree
Hide file tree
Showing 129 changed files with 2,392 additions and 6,168 deletions.
9 changes: 2 additions & 7 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ jobs:
runs-on: ubuntu-latest
steps:

- name: Set up Go 1.13
- name: Set up Go 1.18
uses: actions/setup-go@v1
with:
go-version: 1.13
go-version: 1.18
id: go

- name: Check out code into the Go module directory
Expand All @@ -20,18 +20,13 @@ jobs:
run: |
go get -v -t -d ./...
go get golang.org/x/tools/cmd/goimports
go get golang.org/x/lint/golint
- name: Formatting
run: gofmt -l . | if [ $(grep -c -o -E ".*") -gt 0 ]; then return 1; fi

- name: Imports
run: |
`go list -f {{.Target}} golang.org/x/tools/cmd/goimports` -l . | if [ $(grep -c -o -E ".*") -gt 0 ]; then return 1; fi
- name: Lint
run: |
`go list -f {{.Target}} golang.org/x/lint/golint` -set_exit_status ./...
- name: Build
run: go build -v ./...
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea/
.DS_Store

*.prof
*.test
67 changes: 5 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Pipeline is a GPL3-licensed Go package for building, executing and representing

- API documentation and examples are available via [godoc](https://godoc.org/github.com/saantiaguilera/go-pipeline).
- The [examples](./examples) directory contains more elaborate example applications.
- No specific mocks are needed for testing, every element is completely decoupled and atomic. You can create your own ones however you deem fit.
- No specific mocks are needed for testing, every element is completely decoupled and atomic. You can New your own ones however you deem fit.

## API stability

Expand All @@ -25,7 +25,7 @@ You can import a version with a guaranteed stable API via http://gopkg.in/saanti

## Example

_The following graph creation, execution and representation can be found under the [examples](examples/static/cook_example/) directory._
_The following graph creation, execution and representation can be found under the [examples](examples/usages/cooking_a_recipe_pipeline) directory._

Imagine we are making a dish, we need to:
1. Put the eggs to boil and cut them.
Expand All @@ -36,65 +36,8 @@ Imagine we are making a dish, we need to:
6. Put the meat in the oven.
7. Serve when the meat and the salad are done.

This workflow is represented as such
This workflow is represented as such (with this same API, no need to draw it on your own)

![](examples/static/cook_example/template.svg)
![](examples/usages/cooking_a_recipe_pipeline/template.svg)

This workflow can be built and executed as such.
```go
// Complete stage. Its sequential because we can't serve
// before all the others are done.
graph := pipeline.CreateSequentialGroup(
// Concurrent stage, given we can do the salad / meat separately.
pipeline.CreateConcurrentGroup(
// This will be the salad flow.
pipeline.CreateSequentialGroup(
// Eggs and carrots can be operated concurrently too.
pipeline.CreateConcurrentGroup(
// Sequential stage for the eggs flow.
pipeline.CreateSequentialStage(
CreateBoilEggsStep(),
CreateCutEggsStep(),
),
// Another sequential stage for the carrots (eggs and carrots will be concurrent though!)
pipeline.CreateSequentialStage(
CreateWashCarrotsStep(),
CreateCutCarrotsStep(),
),
),
// This is sequential. When carrots and eggs are done, this will run.
pipeline.CreateSequentialStage(
CreateMakeSaladStep(),
),
),
// Another sequential stage for the meat (concurrently with salad)
pipeline.CreateSequentialGroup(
// If we end up cutting the meat, we can optimize it with the oven operation
pipeline.CreateConcurrentGroup(
// Conditional stage, the meat might be too big
pipeline.CreateConditionalStage(
pipeline.CreateSimpleStatement("is_meat_too_big", IsMeatTooBigForTheOven),
// True:
CreateCutMeatStep(),
// False:
nil,
),
pipeline.CreateSequentialStage(
CreateTurnOvenOnStep(),
),
),
pipeline.CreateSequentialStage(
CreatePutMeatInOvenStep(),
),
),
),
// When everything is done. Serve.
pipeline.CreateSequentialStage(
CreateServeStep(),
),
)

pipe := pipeline.CreatePipeline(CreateYourExecutor())
pipe.Run(graph, CreateYourInputContext())
```
_Note that, for showing purposes, this is all in a single function. You can easily decouple this into more atomic ones that take care of specific responsibilities (eg. making the salad)._
To build this, we simply need to create a step / unit of work for each given task and then "link" them however we want them to be traversed later in the graph. The graph creation can be seen [here](https://github.com/saantiaguilera/go-pipeline/blob/master/examples/usages/cooking_a_recipe_pipeline/main.go#L18)
66 changes: 66 additions & 0 deletions concurrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package pipeline

import (
"context"
"errors"
)

type (
ConcurrentStep[I, O any] struct {
steps []Step[I, O]
reduce reducer[O]
}

reducer[O any] func(context.Context, O, O) (O, error)
)

// NewConcurrentStep creates a step that will run each of the inner steps concurrently.
// The step will wait for all of the steps to finish before returning.
//
// If one of them fails, the step will wait until everyone finishes and after that return the first encountered error.
func NewConcurrentStep[I, O any](steps []Step[I, O], reduce reducer[O]) ConcurrentStep[I, O] {
return ConcurrentStep[I, O]{
steps: steps,
reduce: reduce,
}
}

func (c ConcurrentStep[I, O]) Draw(graph Graph) {
if len(c.steps) > 0 {
var forkSteps []GraphDrawer
for _, s := range c.steps {
forkSteps = append(forkSteps, c.newStepGraphActivity(s))
}

graph.AddConcurrency(forkSteps...)
}
}

func (c ConcurrentStep[I, O]) Run(ctx context.Context, in I) (O, error) {
if len(c.steps) == 0 {
return *new(O), errors.New("cannot run with empty concurrent steps")
}

mres, err := spawnAsync(c.steps, func(s Step[I, O]) (O, error) {
return s.Run(ctx, in)
})

if err != nil {
return *new(O), err
}

acc := mres[0]
for _, v := range mres[1:] {
acc, err = c.reduce(ctx, acc, v)
if err != nil {
return *new(O), err
}
}
return acc, nil
}

func (c ConcurrentStep[I, O]) newStepGraphActivity(drawable DrawableGraph) GraphDrawer {
return func(graph Graph) {
drawable.Draw(graph)
}
}
36 changes: 0 additions & 36 deletions concurrent_group.go

This file was deleted.

95 changes: 0 additions & 95 deletions concurrent_group_test.go

This file was deleted.

36 changes: 0 additions & 36 deletions concurrent_stage.go

This file was deleted.

Loading

0 comments on commit 4efaed7

Please sign in to comment.