Skip to content

Commit

Permalink
x/workflow Stop/Resume operate on ParentRunID
Browse files Browse the repository at this point in the history
  • Loading branch information
widmogrod committed Nov 4, 2023
1 parent 877acad commit f8fcc56
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
14 changes: 9 additions & 5 deletions x/workflow/workflow_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ type (
}
TryRecover struct{}
StopSchedule struct {
// Run can be stopped by user, or by system
RunID string
// ParentRunID can be stopped by user, or by system
ParentRunID string
}
ResumeSchedule struct {
RunID string
ParentRunID string
}
)

Expand Down Expand Up @@ -74,10 +74,14 @@ type (
Scheduled struct {
// ExpectedRunTimestamp is server timestamp + DelayBySeconds
ExpectedRunTimestamp int64
BaseState BaseState
// ParentRunID is a reference to the original run, that scheduled this run and any between
// ParentRunID is used to track history of the execution, and is stable reference to the original run
ParentRunID string
BaseState BaseState
}
ScheduleStopped struct {
BaseState BaseState
ParentRunID string
BaseState BaseState
}
)

Expand Down
7 changes: 5 additions & 2 deletions x/workflow/workflow_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ func TestMachine(t *testing.T) {
},
}).
ThenState(&Scheduled{
ParentRunID: runID,
ExpectedRunTimestamp: di.TimeNow().Add(time.Duration(10) * time.Second).Unix(),
BaseState: BaseState{
RunID: runID,
Expand Down Expand Up @@ -300,9 +301,10 @@ func TestMachine(t *testing.T) {
ForkCase("stop execution", func(c *machine.Case[Command, State]) {
c.
GivenCommand(&StopSchedule{
RunID: runID,
ParentRunID: runID,
}).
ThenState(&ScheduleStopped{
ParentRunID: runID,
BaseState: BaseState{
RunID: runID,
StepID: "",
Expand All @@ -318,9 +320,10 @@ func TestMachine(t *testing.T) {
},
}).
GivenCommand(&ResumeSchedule{
RunID: runID,
ParentRunID: runID,
}).
ThenState(&Scheduled{
ParentRunID: runID,
ExpectedRunTimestamp: di.TimeNow().Add(time.Duration(10) * time.Second).Unix(),
BaseState: BaseState{
RunID: runID,
Expand Down
9 changes: 6 additions & 3 deletions x/workflow/workflow_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func Transition(cmd Command, state State, dep Dependency) (State, error) {

// schedule or delay execution
return &Scheduled{
ParentRunID: context.RunID,
ExpectedRunTimestamp: runTimestamp,
BaseState: context,
}, nil
Expand Down Expand Up @@ -156,12 +157,13 @@ func Transition(cmd Command, state State, dep Dependency) (State, error) {
func(x *StopSchedule) (State, error) {
switch s := state.(type) {
case *Scheduled:
if s.BaseState.RunID != x.RunID {
if s.ParentRunID != x.ParentRunID {
return nil, ErrRunIDNotMatch
}

return &ScheduleStopped{
BaseState: s.BaseState,
ParentRunID: x.ParentRunID,
BaseState: s.BaseState,
}, nil
}

Expand All @@ -170,7 +172,7 @@ func Transition(cmd Command, state State, dep Dependency) (State, error) {
func(x *ResumeSchedule) (State, error) {
switch s := state.(type) {
case *ScheduleStopped:
if s.BaseState.RunID != x.RunID {
if s.ParentRunID != x.ParentRunID {
return nil, ErrRunIDNotMatch
}

Expand All @@ -181,6 +183,7 @@ func Transition(cmd Command, state State, dep Dependency) (State, error) {

return &Scheduled{
ExpectedRunTimestamp: runTimestamp,
ParentRunID: x.ParentRunID,
BaseState: s.BaseState,
}, nil
}
Expand Down

0 comments on commit f8fcc56

Please sign in to comment.