Skip to content

Commit

Permalink
[OCTRL-661] [core] Add state enter timestamp to kafka plugin messages
Browse files Browse the repository at this point in the history
  • Loading branch information
knopers8 authored and teo committed Jun 9, 2022
1 parent 07df415 commit 98fc5bd
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 242 deletions.
9 changes: 8 additions & 1 deletion core/environment/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
)
env.GlobalVars.Set("__fmq_cleanup_count", "0") // initialize to 0 the number of START transitions

// We start with STANDBY, which will not be preceded with enter_STANDBY, thus we set the value here.
enterStateTimeMs := strconv.FormatInt(time.Now().UnixMilli(), 10)
env.UserVars.Set("enter_state_time_ms", enterStateTimeMs)

env.Sm = fsm.NewFSM(
"STANDBY",
fsm.Events{
Expand Down Expand Up @@ -153,7 +157,7 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
env.workflow.GetVars().Set("run_number", rnString)
env.workflow.GetVars().Set("runNumber", rnString)

runStartTime := strconv.FormatInt(time.Now().UnixNano()/1000000, 10)
runStartTime := strconv.FormatInt(time.Now().UnixMilli(), 10)
env.workflow.SetRuntimeVar("run_start_time_ms", runStartTime)
env.workflow.SetRuntimeVar("run_end_time_ms", "") // we delete previous EOR

Expand Down Expand Up @@ -200,6 +204,9 @@ func newEnvironment(userVars map[string]string) (env *Environment, err error) {
env.handlerFunc()(e)
},
"enter_state": func(e *fsm.Event) {
enterStateTimeMs := strconv.FormatInt(time.Now().UnixMilli(), 10)
env.workflow.SetRuntimeVar("enter_state_time_ms", enterStateTimeMs)

errHooks := env.handleHooks(env.Workflow(), fmt.Sprintf("enter_%s", e.Dst))
if errHooks != nil {
e.Cancel(errHooks)
Expand Down
27 changes: 22 additions & 5 deletions core/integration/kafka/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ func (p *Plugin) newEnvStateObject(varStack map[string]string, call string) *kaf
}
}

enterStateTimeMsStr, ok := varStack["enter_state_time_ms"]
if !ok {
log.WithField("call", call).
WithField("partition", envId).
Error("cannot acquire enter_state_time_ms")
return nil
}
enterStateTimeMs, err := strconv.ParseUint(enterStateTimeMsStr, 10, 64)
if err != nil {
log.WithError(err).
WithField("call", call).
WithField("partition", envId).
Errorf("cannot convert enter_state_time_ms (%s) to an unsigned integer", enterStateTimeMsStr)
return nil
}

detectorsStr, ok := varStack["detectors"]
if !ok {
log.WithField("call", call).
Expand All @@ -211,11 +227,12 @@ func (p *Plugin) newEnvStateObject(varStack map[string]string, call string) *kaf
}

return &kafkapb.EnvInfo{
EnvironmentId: envId,
RunNumber: runNumberOpt,
RunType: runTypeOpt,
State: state,
Detectors: detectorsSlice,
EnvironmentId: envId,
RunNumber: runNumberOpt,
RunType: runTypeOpt,
State: state,
Detectors: detectorsSlice,
EnterStateTimestamp: enterStateTimeMs,
}
}

Expand Down
Loading

0 comments on commit 98fc5bd

Please sign in to comment.