From f8fcc56398dd076cf4e31f44e87c3f0f22170410 Mon Sep 17 00:00:00 2001 From: widmogrod Date: Sat, 4 Nov 2023 19:35:23 +0100 Subject: [PATCH] x/workflow Stop/Resume operate on ParentRunID --- x/workflow/workflow_machine.go | 14 +++++++++----- x/workflow/workflow_machine_test.go | 7 +++++-- x/workflow/workflow_transition.go | 9 ++++++--- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/x/workflow/workflow_machine.go b/x/workflow/workflow_machine.go index 6669f00f..903f6683 100644 --- a/x/workflow/workflow_machine.go +++ b/x/workflow/workflow_machine.go @@ -40,11 +40,11 @@ type ( } TryRecover struct{} StopSchedule struct { - // Run can be stopped by user, or by system - RunID string + // ParentRunID can be stopped by user, or by system + ParentRunID string } ResumeSchedule struct { - RunID string + ParentRunID string } ) @@ -74,10 +74,14 @@ type ( Scheduled struct { // ExpectedRunTimestamp is server timestamp + DelayBySeconds ExpectedRunTimestamp int64 - BaseState BaseState + // ParentRunID is a reference to the original run, that scheduled this run and any between + // ParentRunID is used to track history of the execution, and is stable reference to the original run + ParentRunID string + BaseState BaseState } ScheduleStopped struct { - BaseState BaseState + ParentRunID string + BaseState BaseState } ) diff --git a/x/workflow/workflow_machine_test.go b/x/workflow/workflow_machine_test.go index 0cad58bd..6c005382 100644 --- a/x/workflow/workflow_machine_test.go +++ b/x/workflow/workflow_machine_test.go @@ -261,6 +261,7 @@ func TestMachine(t *testing.T) { }, }). ThenState(&Scheduled{ + ParentRunID: runID, ExpectedRunTimestamp: di.TimeNow().Add(time.Duration(10) * time.Second).Unix(), BaseState: BaseState{ RunID: runID, @@ -300,9 +301,10 @@ func TestMachine(t *testing.T) { ForkCase("stop execution", func(c *machine.Case[Command, State]) { c. GivenCommand(&StopSchedule{ - RunID: runID, + ParentRunID: runID, }). ThenState(&ScheduleStopped{ + ParentRunID: runID, BaseState: BaseState{ RunID: runID, StepID: "", @@ -318,9 +320,10 @@ func TestMachine(t *testing.T) { }, }). GivenCommand(&ResumeSchedule{ - RunID: runID, + ParentRunID: runID, }). ThenState(&Scheduled{ + ParentRunID: runID, ExpectedRunTimestamp: di.TimeNow().Add(time.Duration(10) * time.Second).Unix(), BaseState: BaseState{ RunID: runID, diff --git a/x/workflow/workflow_transition.go b/x/workflow/workflow_transition.go index 0bedf935..a33eec8d 100644 --- a/x/workflow/workflow_transition.go +++ b/x/workflow/workflow_transition.go @@ -86,6 +86,7 @@ func Transition(cmd Command, state State, dep Dependency) (State, error) { // schedule or delay execution return &Scheduled{ + ParentRunID: context.RunID, ExpectedRunTimestamp: runTimestamp, BaseState: context, }, nil @@ -156,12 +157,13 @@ func Transition(cmd Command, state State, dep Dependency) (State, error) { func(x *StopSchedule) (State, error) { switch s := state.(type) { case *Scheduled: - if s.BaseState.RunID != x.RunID { + if s.ParentRunID != x.ParentRunID { return nil, ErrRunIDNotMatch } return &ScheduleStopped{ - BaseState: s.BaseState, + ParentRunID: x.ParentRunID, + BaseState: s.BaseState, }, nil } @@ -170,7 +172,7 @@ func Transition(cmd Command, state State, dep Dependency) (State, error) { func(x *ResumeSchedule) (State, error) { switch s := state.(type) { case *ScheduleStopped: - if s.BaseState.RunID != x.RunID { + if s.ParentRunID != x.ParentRunID { return nil, ErrRunIDNotMatch } @@ -181,6 +183,7 @@ func Transition(cmd Command, state State, dep Dependency) (State, error) { return &Scheduled{ ExpectedRunTimestamp: runTimestamp, + ParentRunID: x.ParentRunID, BaseState: s.BaseState, }, nil }