Skip to content

Commit

Permalink
[core] Make sure EOR time is later than EOX time
Browse files Browse the repository at this point in the history
PDP requires that SOR < SOX < EOX < EOR. To achieve this, setting run_end_time_ms was moved so that there is an opportunity to run CTP run stop hooks before that.
  • Loading branch information
knopers8 committed Nov 28, 2023
1 parent 5cab192 commit acbeec1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 18 deletions.
43 changes: 25 additions & 18 deletions core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,24 +194,6 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
} else {
log.Error("cannot access AliECS workflow configuration defaults")
}
} else if e.Event == "STOP_ACTIVITY" {
endTime, ok := env.workflow.GetUserVars().Get("run_end_time_ms")
if ok && endTime == "" {
runEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
env.workflow.SetRuntimeVar("run_end_time_ms", runEndTime)
} else {
log.WithField("partition", envId.String()).
Debug("O2 End time already set before before_STOP_ACTIVITY")
}
} else if e.Event == "GO_ERROR" {
endTime, ok := env.workflow.GetUserVars().Get("run_end_time_ms")
if ok && endTime == "" {
runEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
env.workflow.SetRuntimeVar("run_end_time_ms", runEndTime)
} else {
log.WithField("partition", envId.String()).
Debug("O2 End time already set before before_GO_ERROR")
}
}

if rn := env.GetCurrentRunNumber(); rn != 0 {
Expand All @@ -231,6 +213,17 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
if errHooks != nil {
e.Cancel(errHooks)
}

if e.Event == "STOP_ACTIVITY" {
endTime, ok := env.workflow.GetUserVars().Get("run_end_time_ms")
if ok && endTime == "" {
runEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
env.workflow.SetRuntimeVar("run_end_time_ms", runEndTime)
} else {
log.WithField("partition", envId.String()).
Debug("O2 End time already set before before_STOP_ACTIVITY")
}
}
},
"leave_state": func(_ context.Context, e *fsm.Event) {
errHooks := env.handleHooks(env.Workflow(), fmt.Sprintf("leave_%s", e.Src))
Expand Down Expand Up @@ -261,6 +254,19 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
"after_event": func(_ context.Context, e *fsm.Event) {
defer func() { env.currentTransition = "" }()

// we set the run end time after all the transition is complete to allow trigger to finish first,
// which is a requirement from PDP (SOR < SOX < EOX < EOR)
if e.Event == "GO_ERROR" {
endTime, ok := env.workflow.GetUserVars().Get("run_end_time_ms")
if ok && endTime == "" {
runEndTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
env.workflow.SetRuntimeVar("run_end_time_ms", runEndTime)
} else {
log.WithField("partition", envId.String()).
Debug("O2 End time already set before after_GO_ERROR")
}
}

errHooks := env.handleHooks(env.Workflow(), fmt.Sprintf("after_%s", e.Event))
if errHooks != nil {
e.Cancel(errHooks)
Expand Down Expand Up @@ -303,6 +309,7 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
env.currentRunNumber = 0
env.workflow.GetVars().Del("run_number")
env.workflow.GetVars().Del("runNumber")

// Ensure the auto stop timer is stopped (important for stop transitions NOT triggered by the timer itself)
env.invalidateAutoStopTransition()
}
Expand Down
15 changes: 15 additions & 0 deletions core/integration/ccdb/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,21 @@ func (p *Plugin) CallStack(data interface{}) (stack map[string]interface{}) {
Debug("probably went to ERROR while not in RUNNING, doing nothing")
return
}

trgEndTimeMsUint, _ := strconv.ParseUint(grp.trgEndTimeMs, 10, 32)
o2EndTimeMsUint, _ := strconv.ParseUint(grp.runEndTimeMs, 10, 32)
if trgEndTimeMsUint == 0 {
log.WithField("call", "RunStop").
WithField("partition", envId).
WithField("level", infologger.IL_Ops).
Error("End of trigger time will be missing in the CCDB GRPECS. Please create a log entry in BK and tag PDP")
}
if trgEndTimeMsUint > o2EndTimeMsUint {
log.WithField("call", "RunStop").
WithField("partition", envId).
WithField("level", infologger.IL_Ops).
Errorf("End of trigger time will be later than end of run time (%d > %d) in the CCDB GRPECS. Please create a log entry in BK and tag PDP", trgEndTimeMsUint, o2EndTimeMsUint)
}
_, runExists := p.existingRuns[grp.runNumber]
if runExists {
delete(p.existingRuns, grp.runNumber)
Expand Down

0 comments on commit acbeec1

Please sign in to comment.