Skip to content

Commit

Permalink
Merge pull request #1 from lefelys/feat/readiness-state
Browse files Browse the repository at this point in the history
feat: add readiness state
  • Loading branch information
lefelys authored Sep 18, 2020
2 parents 888d454 + d7cec4b commit d9a0002
Show file tree
Hide file tree
Showing 8 changed files with 397 additions and 52 deletions.
22 changes: 22 additions & 0 deletions dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type dependState struct {
parent State

finished chan struct{}
ready chan struct{}

sync.RWMutex
}
Expand Down Expand Up @@ -53,10 +54,31 @@ func (d *dependState) Wait() {
d.parent.Wait()
}

func (d *dependState) Ready() <-chan struct{} {
d.Lock()
defer d.Unlock()

if d.ready != nil {
// To avoid memory leaks - ready channel is created only once
return d.ready
}

d.ready = make(chan struct{})

go func() {
<-d.children.Ready()
<-d.parent.Ready()
close(d.ready)
}()

return d.ready
}

func (d *dependState) Err() (err error) {
if err = d.parent.Err(); err != nil {
return err
}

for _, states := range d.children.states {
if err = states.Err(); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions empty.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ func Empty() State { return emptyState{} }
func (e emptyState) Err() error { return nil }
func (e emptyState) Shutdown(_ context.Context) error { return nil }
func (e emptyState) Wait() {}
func (e emptyState) Ready() <-chan struct{} { return closedchan }
func (e emptyState) Value(_ interface{}) interface{} { return nil }
func (e emptyState) DependsOn(children ...State) State { return withDependency(e, children...) }
func (e emptyState) close() {}
Expand Down
5 changes: 5 additions & 0 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func ExampleWithShutdown() {

// send a signal that the shutdown is complete
tail.Done()

return
case <-ticker.C:
// some job
Expand All @@ -44,6 +45,7 @@ func ExampleWithShutdown() {
func ExampleWithShutdown_dependency() {
runJob := func(name string) State {
st, tail := WithShutdown()

go func() {
<-tail.End()

Expand Down Expand Up @@ -78,6 +80,7 @@ func ExampleWithShutdown_dependency() {
func ExampleWithShutdown_dependencyWrap() {
st1 := func() State {
st, tail := WithShutdown()

go func() {
<-tail.End()
fmt.Println("shutdown job 1")
Expand Down Expand Up @@ -213,6 +216,7 @@ func ExampleWithAnnotation_shutdown() {
func ExampleMerge() {
runJob := func(name string, duration time.Duration) State {
st, tail := WithShutdown()

go func() {
<-tail.End()

Expand All @@ -230,6 +234,7 @@ func ExampleMerge() {
st3 := runJob("job 3", 150*time.Millisecond)

st := Merge(st1, st2, st3)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Expand Down
27 changes: 25 additions & 2 deletions group.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ type group struct {
states []State
toClose map[int]struct{}

done chan struct{}
finished chan struct{}
done, finished chan struct{}
ready chan struct{}

sync.RWMutex
}
Expand Down Expand Up @@ -47,6 +47,7 @@ func merge(states ...State) *group {
// already closed
default:
toClose[i] = struct{}{}

addToCloseStream(done, s)
}
}
Expand Down Expand Up @@ -80,6 +81,28 @@ func (g *group) Wait() {
}
}

func (g *group) Ready() <-chan struct{} {
g.Lock()
defer g.Unlock()

if g.ready != nil {
// To avoid memory leaks - ready channel is created only once
return g.ready
}

g.ready = make(chan struct{})

go func() {
for _, m := range g.states {
<-m.Ready()
}

close(g.ready)
}()

return g.ready
}

func (g *group) close() {
g.Lock()
select {
Expand Down
75 changes: 75 additions & 0 deletions readiness.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package state

import (
"sync"
)

type readinessState struct {
*group

ready chan struct{}
readyOut chan struct{}

sync.Mutex
}

// ReadinessTail detaches after readiness state initialization.
// The tail is supposed to stay in a background job associated with
// created State as it carries readiness signal.
type ReadinessTail interface {
// Ok sends a signal that background job is ready.
// Not calling Ok will block all parents readiness and cause
// the channel from State's Ready call to block forever.
// After the first call, subsequent calls do nothing.
Ok()
}

func (r *readinessState) Ok() {
r.Lock()
defer r.Unlock()

select {
case <-r.ready:
// Already ready
default:
close(r.ready)
}
}

func WithReadiness(children ...State) (State, ReadinessTail) {
m := withReadiness(children...)
return m, m
}

func withReadiness(children ...State) *readinessState {
s := &readinessState{
group: merge(children...),
ready: make(chan struct{}),
}

return s
}

func (r *readinessState) Ready() <-chan struct{} {
r.Lock()
defer r.Unlock()

if r.readyOut != nil {
// To avoid memory leaks - readyOut channel is created only once
return r.readyOut
}

r.readyOut = make(chan struct{})

go func() {
<-r.group.Ready()
<-r.ready
close(r.readyOut)
}()

return r.readyOut
}

func (r *readinessState) DependsOn(children ...State) State {
return withDependency(r, children...)
}
11 changes: 10 additions & 1 deletion state.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import (
// State's methods may be called by multiple goroutines simultaneously.
type State interface {
// Err returns the first encountered error in this state.
// During propagating error from bottom to top, it is being annotated
// While error is propagated from bottom to top, it is being annotated
// by annotation states in a chain. Annotation uses introduced in
// go 1.13 errors wrapping.
//
Expand All @@ -86,6 +86,15 @@ type State interface {
// in this case, it is considered as fully completed and returns nil.
Shutdown(ctx context.Context) error

// Ready returns a channel that signals that all states in tree are
// ready. If there is no readiness states in the tree - state is considered
// as ready by default.
//
// If some readiness state in the tree didn't send Ok signal -
// returned channel blocks forever. It is caller's responsibility to
// handle possible block.
Ready() <-chan struct{}

// Value returns the first found value in this state for key,
// or nil if no value is associated with key. The tree is searched
// from top to bottom and from left to right.
Expand Down
Loading

0 comments on commit d9a0002

Please sign in to comment.