Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][executor] more details in logs related to handling roles #538

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/workflow/aggregatorrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func (r *aggregatorRole) updateStatus(s task.Status) {
"child status": s.String(),
"aggregator status": r.status.get().String(),
"aggregator role": r.Name,
"partition": r.GetEnvironmentId().String(),
}).
Trace("aggregator role about to merge incoming child status")
r.status.merge(s, r)
Expand All @@ -252,8 +253,10 @@ func (r *aggregatorRole) updateState(s task.State) {
if r == nil {
return
}
log.WithField("role", r.Name).WithField("state", s.String()).Trace("updating state")
r.state.merge(s, r)
log.WithField("role", r.Name).
WithField("partition", r.GetEnvironmentId().String()).
Tracef("updated state to %s upon input state %s", r.state.get().String(), s.String())
r.SendEvent(&event.RoleEvent{Name: r.Name, State: r.state.get().String(), RolePath: r.GetPath()})
if r.parent != nil {
r.parent.updateState(r.state.get())
Expand Down
12 changes: 9 additions & 3 deletions core/workflow/callrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@ func (t *callRole) UpdateState(s task.State) {

func (t *callRole) updateStatus(s task.Status) {
if t.parent == nil {
log.WithField("status", s.String()).Error("cannot update status with nil parent")
log.WithField("status", s.String()).
WithField("partition", t.GetEnvironmentId().String()).
Error("cannot update status with nil parent")
}
t.status.merge(s, t)
t.SendEvent(&event.RoleEvent{Name: t.Name, Status: t.status.get().String(), RolePath: t.GetPath()})
Expand All @@ -217,10 +219,14 @@ func (t *callRole) updateStatus(s task.Status) {

func (t *callRole) updateState(s task.State) {
if t.parent == nil {
log.WithField("state", s.String()).Error("cannot update state with nil parent")
log.WithField("state", s.String()).
WithField("partition", t.GetEnvironmentId().String()).
Error("cannot update state with nil parent")
}
log.WithField("role", t.Name).WithField("state", s.String()).Trace("updating state")
t.state.merge(s, t)
log.WithField("role", t.Name).
WithField("partition", t.GetEnvironmentId().String()).
Tracef("updated state to %s upon input state %s", t.state.get().String(), s.String())
t.SendEvent(&event.RoleEvent{Name: t.Name, State: t.state.get().String(), RolePath: t.GetPath()})
t.parent.updateState(s)
}
Expand Down
12 changes: 9 additions & 3 deletions core/workflow/taskrole.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ func (t *taskRole) UpdateState(s task.State) {

func (t *taskRole) updateStatus(s task.Status) {
if t.parent == nil {
log.WithField("status", s.String()).Error("cannot update status with nil parent")
log.WithField("status", s.String()).
WithField("partition", t.GetEnvironmentId().String()).
Error("cannot update status with nil parent")
}
t.status.merge(s, t)
t.SendEvent(&event.RoleEvent{Name: t.Name, Status: t.status.get().String(), RolePath: t.GetPath()})
Expand All @@ -221,10 +223,14 @@ func (t *taskRole) updateStatus(s task.Status) {

func (t *taskRole) updateState(s task.State) {
if t.parent == nil {
log.WithField("state", s.String()).Error("cannot update state with nil parent")
log.WithField("state", s.String()).
WithField("partition", t.GetEnvironmentId().String()).
Error("cannot update state with nil parent")
}
log.WithField("role", t.Name).WithField("state", s.String()).Trace("updating state")
t.state.merge(s, t)
log.WithField("role", t.Name).
WithField("partition", t.GetEnvironmentId().String()).
Tracef("updated state to %s upon input state %s", t.state.get().String(), s.String())
t.SendEvent(&event.RoleEvent{Name: t.Name, State: t.state.get().String(), RolePath: t.GetPath()})
t.parent.updateState(s)
}
Expand Down
18 changes: 16 additions & 2 deletions executor/executable/controllabletask.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,8 +467,10 @@ func (t *ControllableTask) Launch() error {
if err != nil {
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithField("taskId", t.ti.TaskID.GetValue()).
WithField("taskName", t.ti.Name).
WithError(err).
Warning("error marshaling message from task")
Warning("error marshaling message")
} else {
t.sendMessage(jsonEvent)
log.WithField("partition", t.knownEnvironmentId.String()).
Expand All @@ -492,6 +494,8 @@ func (t *ControllableTask) Launch() error {
if t.rpc == nil {
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithField("taskId", deo.TaskId.GetValue()).
WithField("taskName", t.ti.Name).
WithError(err).
Debug("event stream done")
break
Expand All @@ -500,6 +504,8 @@ func (t *ControllableTask) Launch() error {
if err == io.EOF {
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithField("taskId", deo.TaskId.GetValue()).
WithField("taskName", t.ti.Name).
WithError(err).
Debug("event stream EOF")
break
Expand All @@ -510,7 +516,9 @@ func (t *ControllableTask) Launch() error {
WithField("detector", t.knownDetector).
WithField("errorType", reflect.TypeOf(err)).
WithField("level", infologger.IL_Devel).
Warningf("error receiving event from task %s", deo.TaskId.String())
WithField("taskId", deo.TaskId.GetValue()).
WithField("taskName", t.ti.Name).
Warning("error receiving event")
if status.Code(err) == codes.Unavailable {
break
}
Expand All @@ -522,6 +530,8 @@ func (t *ControllableTask) Launch() error {
if deviceEvent == nil {
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithField("taskId", deo.TaskId.GetValue()).
WithField("taskName", t.ti.Name).
Debug("nil DeviceEvent received (NULL_DEVICE_EVENT) - closing stream")
break
} else {
Expand Down Expand Up @@ -591,10 +601,14 @@ func (t *ControllableTask) Launch() error {
_ = t.rpc.Close() // NOTE: might return non-nil error, but we don't care much
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithField("taskId", t.ti.TaskID.GetValue()).
WithField("taskName", t.ti.Name).
Debug("rpc client closed")
t.rpc = nil
log.WithField("partition", t.knownEnvironmentId.String()).
WithField("detector", t.knownDetector).
WithField("taskId", t.ti.TaskID.GetValue()).
WithField("taskName", t.ti.Name).
Debug("rpc client removed")
}

Expand Down
Loading